package main import ( "fmt" mgo "mongodb" qu "qfw/util" "strconv" "sync" "time" "github.com/donnie4w/go-logger/logger" ) type Task struct { Code string //爬虫代码 Site string //站点 Channel string //栏目 ErrType string //异常类型:6:运行异常;5:下载异常;4:发布时间异常;3:乱码;2:状态码异常;1:数据量异常 ErrInfo map[string]map[string]interface{} //异常集合 Description string //描述 State int //状态 } var ( StartTime int64 //上一个工作日的起始时间 EndTime int64 //上一个工作日的结束时间 TaskMap map[string]*Task //任务集合 UpdateStateCron string //每天关闭任务的时间 CreateTaskCron string //每天创建任务的时间 CloseTaskCron string //每天关闭任务的时间 CodeSummaryCron string //每天统计爬虫信息 CloseNum int //关闭几天的任务 DayNum int //更新数据天数 UserTaskNum map[string]map[string]int //记录每人每天新建任务量 ) //创建任务 func CreateTaskProcess() { InitInfo() //初始化 GetSpiderDownloadRateData() //1、统计spider_downloadrate前一天列表页采集异常爬虫 GetStatusCodeErrorData() //2、统计spider_sitecheck 站点异常爬虫(404) GetDownloadFailedData() //3、统计spider_highlistdata前一天下载失败的爬虫数据(统计完成后修改状态state:0) GetRegatherFailedData() //4、统计regatherdata前一天重采失败的爬虫数据 GetDTPErrData() //5、统计spider_warn异常数据(发布时间异常、乱码) GetDownloadNumErrData() //6、统计download前一天下载量异常的爬虫数据(每天1点统计下载量,目前统计完成需要1个小时) SaveResult() //保存统计信息 CreateLuaTask() //创建任务 SaveUserCreateTaskNum() //保存每人创建的任务量 } //初始化 func InitInfo() { defer qu.Catch() TaskMap = map[string]*Task{} UserTaskNum = map[string]map[string]int{} InitTime() //初始化时间 } //关闭任务 func CloseTask() { qu.Catch() logger.Debug("---清理未更新任务---") decreaseDay, day := 0, 0 var cleanDay string for { decreaseDay-- weekDay := time.Now().AddDate(0, 0, decreaseDay).Weekday().String() if weekDay != "Saturday" && weekDay != "Sunday" { day++ } if day == CloseNum { cleanDay = time.Now().AddDate(0, 0, decreaseDay).Format("2006-01-02") break } } the_time, _ := time.ParseInLocation(qu.Date_Short_Layout, cleanDay, time.Local) unix_time := the_time.Unix() //凌晨时间戳 query := map[string]interface{}{ "i_state": 0, "l_complete": map[string]interface{}{ "$lt": unix_time + 86400, }, "s_type": "1", // "s_type": map[string]interface{}{ // "$ne": "7", // }, } logger.Debug("query:", query) set := map[string]interface{}{ "$set": map[string]interface{}{ "i_state": 6, }, } MgoE.Update("task", query, set, false, true) logger.Debug("---清理未更新任务完毕---") } //1、统计spider_downloadrate前一天列表页采集异常爬虫 func GetSpiderDownloadRateData() { defer qu.Catch() logger.Debug("---开始统计spider_downloadrate异常信息---") sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout) query := map[string]interface{}{ "date": date, } it := sess.DB("spider").C("spider_downloadrate").Find(&query).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() stype := -1 //1、统计采集频率异常信息 oh_percent := qu.IntAll(tmp["oh_percent"]) event := qu.IntAll(tmp["event"]) if oh_percent > 0 && event != 7410 { stype = 8 } //2、统计列表页异常(统计zero占总下载次数的百分比超过80%的) alltimes := qu.IntAll(tmp["alltimes"]) zero := qu.IntAll(tmp["zero"]) percent := 0 //记录百分比 if zero > 0 { tmpPercent := float64(zero) / float64(alltimes) tmpPercent, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", tmpPercent), 64) percent = int(tmpPercent * float64(100)) if percent >= 80 { //占比超过80% stype = 7 } } if stype != -1 { //出现异常 code := qu.ObjToString(tmp["spidercode"]) site := qu.ObjToString(tmp["site"]) channel := qu.ObjToString(tmp["channel"]) t := &Task{ Code: code, Site: site, Channel: channel, ErrInfo: map[string]map[string]interface{}{}, State: 1, } if stype == 8 { t.ErrType = "8" t.ErrInfo = map[string]map[string]interface{}{ "8": map[string]interface{}{ "num": oh_percent, }, } t.Description = "采集频率异常:\n 列表页共采集" + fmt.Sprint(alltimes) + "轮,其中有" + fmt.Sprint(oh_percent) + "轮数据全采\n" } else if stype == 7 { t.ErrType = "7" t.ErrInfo = map[string]map[string]interface{}{ "7": map[string]interface{}{ "num": percent, }, } t.Description = "列表页异常:\n 列表页采集无信息次数占比" + fmt.Sprint(percent) + "%\n" } lock.Lock() TaskMap[code] = t lock.Unlock() } }(tmp) if n%100 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("---统计spider_downloadrate异常信息完成---") } //2、状态码404 func GetStatusCodeErrorData() { defer qu.Catch() logger.Debug("---开始统计栏目地址404数据---") sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} field := map[string]interface{}{ "url": 1, "code": 1, "site": 1, "channel": 1, } query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lte": EndTime, }, "statuscode": 404, } it := sess.DB("spider").C("spider_sitecheck").Find(&query).Select(&field).Iter() count, _ := sess.DB("spider").C("spider_sitecheck").Find(&query).Count() logger.Debug("共有404地址", count, "条") n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["code"]) one, _ := MgoE.FindOneByField("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"state": 1}) state := qu.IntAll((*one)["state"]) if state == 4 || state > 6 { return } //判断3天内是否有采集数据,有则不建404任务 stime, etime := GetTime(-3), GetTime(0) q := map[string]interface{}{ "spidercode": code, "l_np_publishtime": map[string]interface{}{ "$gte": stime, "$lte": etime, }, } if MgoS.Count("data_bak", q) > 0 { //有采集数据,不认为是404 return } href := qu.ObjToString(tmp["url"]) site := qu.ObjToString(tmp["site"]) channel := qu.ObjToString(tmp["channel"]) lock.Lock() if t := TaskMap[code]; t != nil { t.ErrInfo["6"] = map[string]interface{}{ //ErrInfo新增下载异常信息 "num": 404, "hrefs": []string{href}, } t.Description += "网站监测:404\n" + href + "\n" t.State = 1 } else { t := &Task{ Code: code, Site: site, Channel: channel, ErrType: "6", ErrInfo: map[string]map[string]interface{}{}, Description: "网站监测:404\n" + href + "\n", State: 1, } t.ErrInfo = map[string]map[string]interface{}{ "6": map[string]interface{}{ "num": 404, "hrefs": []string{href}, }, } TaskMap[code] = t } lock.Unlock() }(tmp) if n%100 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("---统计栏目地址404数据完成---") } //3、统计三级页下载失败数据 /* 先统计下载失败信息再更新下载失败信息状态(ResetDataState)使其可重新下载,这样不影响统计 但是任务已经就绪,若下载失败信息重新下载成功,则使任务不太准备 若先重置状态再统计,会使任务统计时缺少,无法正常监控 */ func GetDownloadFailedData() { defer qu.Catch() logger.Debug("---开始统计下载失败信息---") sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} field := map[string]interface{}{ "spidercode": 1, "href": 1, "site": 1, "channel": 1, } query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lte": EndTime, }, "state": -1, } it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter() count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count() logger.Debug("共有下载失败数据", count, "条") n := 0 //arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) href := qu.ObjToString(tmp["href"]) site := qu.ObjToString(tmp["site"]) channel := qu.ObjToString(tmp["channel"]) lock.Lock() if t := TaskMap[code]; t != nil { if info := t.ErrInfo["5"]; info != nil { num := qu.IntAll(info["num"]) num++ info["num"] = num hrefs := info["hrefs"].([]string) if len(hrefs) < 3 { hrefs = append(hrefs, href) info["hrefs"] = hrefs t.Description += href + "\n" } if num >= 10 { t.State = 1 } } else { t.ErrInfo["5"] = map[string]interface{}{ //ErrInfo新增下载异常信息 "num": 1, "hrefs": []string{href}, } t.Description += "下载异常:\n" + href + "\n" } } else { t := &Task{ Code: code, Site: site, Channel: channel, ErrType: "5", ErrInfo: map[string]map[string]interface{}{}, Description: "下载异常:\n" + href + "\n", State: 0, } t.ErrInfo = map[string]map[string]interface{}{ "5": map[string]interface{}{ "num": 1, "hrefs": []string{href}, }, } TaskMap[code] = t } //更新state状态重新下载 // update := []map[string]interface{}{} // update = append(update, map[string]interface{}{"_id": tmp["_id"]}) // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"state": 0, "times": 0}}) // arr = append(arr, update) // if len(arr) > 500 { // tmps := arr // MgoS.UpdateBulk("spider_highlistdata", tmps...) // arr = [][]map[string]interface{}{} // } lock.Unlock() }(tmp) if n%100 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() // lock.Lock() // if len(arr) > 0 { // MgoS.UpdateBulk("spider_highlistdata", arr...) // arr = [][]map[string]interface{}{} // } // lock.Unlock() logger.Debug("---统计下载失败信息完成---") } //4、统计重采失败数据 func GetRegatherFailedData() { defer qu.Catch() logger.Debug("---开始统计重采失败信息---") sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} field := map[string]interface{}{ "spidercode": 1, "href": 1, "site": 1, "channel": 1, } query := map[string]interface{}{ "state": map[string]interface{}{ "$lte": 1, }, "from": "lua", "comeintime": map[string]interface{}{ "$gte": StartTime, "$lte": EndTime, }, } it := sess.DB("spider").C("regatherdata").Find(&query).Select(&field).Iter() count, _ := sess.DB("spider").C("regatherdata").Find(&query).Count() logger.Debug("共有重采失败数据", count, "条") n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) href := qu.ObjToString(tmp["href"]) site := qu.ObjToString(tmp["site"]) channel := qu.ObjToString(tmp["channel"]) lock.Lock() if t := TaskMap[code]; t != nil { if info := t.ErrInfo["4"]; info != nil { num := qu.IntAll(info["num"]) num++ info["num"] = num hrefs := info["hrefs"].([]string) if len(hrefs) < 3 { hrefs = append(hrefs, href) info["hrefs"] = hrefs t.Description += href + "\n" } if num >= 10 { t.State = 1 } } else { t.ErrInfo["4"] = map[string]interface{}{ //ErrInfo新增下载异常信息 "num": 1, "hrefs": []string{href}, } t.Description += "运行报错:\n" + href + "\n" } } else { t := &Task{ Code: code, Site: site, Channel: channel, ErrType: "4", ErrInfo: map[string]map[string]interface{}{}, Description: "运行报错:\n" + href + "\n", State: 0, } t.ErrInfo = map[string]map[string]interface{}{ "4": map[string]interface{}{ "num": 1, "hrefs": []string{href}, }, } TaskMap[code] = t } lock.Unlock() }(tmp) if n%100 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() // for _, task := range TaskMap { // qu.Debug("code:", task.Code) // qu.Debug("site:", task.Site) // qu.Debug("channel:", task.Channel) // qu.Debug("errtype:", task.ErrType) // qu.Debug("description:", task.Description) // qu.Debug("info:", task.ErrInfo) // qu.Debug("-------------------------------------------") // tmap := map[string]interface{}{} // ab, _ := json.Marshal(&task) // json.Unmarshal(ab, &tmap) // MgoE.Save("save_aa", tmap) // } logger.Debug("---统计重采失败信息完成---") } //5、统计detail、title、publishtime异常数据 func GetDTPErrData() { defer qu.Catch() logger.Debug("---开始统计信息异常数据---") sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} field := map[string]interface{}{ "code": 1, "href": 1, "site": 1, "channel": 1, "field": 1, "info": 1, } query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lte": EndTime, }, "level": 2, //2:error数据 1:warn数据 } it := sess.DB("spider").C("spider_warn").Find(&query).Select(&field).Iter() count, _ := sess.DB("spider").C("spider_warn").Find(&query).Count() logger.Debug("共有信息异常数据", count, "条") n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() errnum := "2" //detail、 title异常 destmp := "正文标题异常:\n" field := qu.ObjToString(tmp["field"]) info := qu.ObjToString(tmp["info"]) if field == "publishtime" { //发布时间异常 if info == "Publishtime Is Too Late" { //发布时间超前的不建任务 return } errnum = "3" destmp = "发布时间异常:\n" } code := qu.ObjToString(tmp["code"]) href := qu.ObjToString(tmp["href"]) site := qu.ObjToString(tmp["site"]) channel := qu.ObjToString(tmp["channel"]) lock.Lock() if t := TaskMap[code]; t != nil { if info := t.ErrInfo[errnum]; info != nil { num := qu.IntAll(info["num"]) num++ info["num"] = num hrefs := info["hrefs"].([]string) if len(hrefs) < 3 { hrefs = append(hrefs, href) info["hrefs"] = hrefs t.Description += href + "\n" } if num >= 10 { t.State = 1 } } else { t.ErrInfo[errnum] = map[string]interface{}{ "num": 1, "hrefs": []string{href}, } t.Description += destmp + href + "\n" } } else { t := &Task{ Code: code, Site: site, Channel: channel, ErrType: errnum, ErrInfo: map[string]map[string]interface{}{}, Description: destmp + href + "\n", State: 0, } t.ErrInfo = map[string]map[string]interface{}{ errnum: map[string]interface{}{ "num": 1, "hrefs": []string{href}, }, } TaskMap[code] = t } lock.Unlock() }(tmp) if n%100 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("---统计信息异常数据完成---") } //6、统计下载量异常数据 func GetDownloadNumErrData() { defer qu.Catch() logger.Debug("---开始统计下载量异常数据---") sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} field := map[string]interface{}{ "downloadNum": 1, "code": 1, "averageDownload": 1, "site": 1, "channel": 1, } query := map[string]interface{}{ "isok": false, } it := sess.DB("spider").C("spider_download").Find(&query).Select(&field).Iter() count, _ := sess.DB("spider").C("spider_download").Find(&query).Count() logger.Debug("共有下载量异常数据", count, "条") n := 0 arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["code"]) site := qu.ObjToString(tmp["site"]) channel := qu.ObjToString(tmp["channel"]) average := qu.IntAll(tmp["averageDownload"]) date := "" //日期 dnum := 0 //下载量 for d, n := range tmp["downloadNum"].(map[string]interface{}) { date = d dnum = qu.IntAll(n) } lock.Lock() if t := TaskMap[code]; t != nil { t.ErrInfo["1"] = map[string]interface{}{ //ErrInfo新增下载异常信息 "num": dnum, "date": date, "average": average, } t.Description += "下载量异常:\n" + date + ":" + fmt.Sprint(dnum) + "\n" } else { t := &Task{ Code: code, Site: site, Channel: channel, ErrType: "1", ErrInfo: map[string]map[string]interface{}{}, Description: "下载量异常:\n" + date + ":" + fmt.Sprint(dnum) + "\n", State: 0, } t.ErrInfo = map[string]map[string]interface{}{ "1": map[string]interface{}{ "num": dnum, "date": date, "average": average, }, } TaskMap[code] = t } //更新isok update := []map[string]interface{}{} update = append(update, map[string]interface{}{"_id": tmp["_id"]}) update = append(update, map[string]interface{}{"$set": map[string]interface{}{"isok": true}}) arr = append(arr, update) if len(arr) > 500 { tmps := arr MgoS.UpdateBulk("spider_download", tmps...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) if n%100 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() lock.Lock() if len(arr) > 0 { MgoS.UpdateBulk("spider_download", arr...) arr = [][]map[string]interface{}{} } lock.Unlock() logger.Debug("---统计下载量异常数据完成---") } //保存统计信息 func SaveResult() { defer qu.Catch() logger.Debug("---开始保存信息---") wg := &sync.WaitGroup{} lock := &sync.Mutex{} ch := make(chan bool, 10) savearr := []map[string]interface{}{} for _, task := range TaskMap { wg.Add(1) ch <- true go func(t *Task) { defer func() { <-ch wg.Done() }() delYearMinCode := false if errInfo := t.ErrInfo; errInfo != nil { //爬虫任务为下载异常、运行异常、404、时间异常、数据异常任务时,不再建该爬虫的抽查任务 if len(errInfo) >= 2 || (len(errInfo) == 1 && errInfo["1"] == nil) { //不是数量异常任务 delYearMinCode = true } } lock.Lock() has := YearMinCodeMap[t.Code] lock.Unlock() if delYearMinCode { lock.Lock() delete(YearMinCodeMap, t.Code) lock.Unlock() go MgoE.Update("luayearmincode", map[string]interface{}{"code": t.Code}, map[string]interface{}{"$set": map[string]interface{}{"send": true}}, false, false) } else if has { //luayearmincode中爬虫任务删除 return } result := map[string]interface{}{} result["code"] = t.Code result["site"] = t.Site result["channel"] = t.Channel result["errtype"] = t.ErrType result["errinfo"] = t.ErrInfo result["description"] = t.Description result["comeintime"] = time.Now().Unix() result["state"] = t.State //result["updatetime"] = time.Now().Unix() lua, _ := MgoE.FindOne("luaconfig", map[string]interface{}{"code": t.Code}) if lua != nil && len(*lua) > 0 { result["modifyid"] = (*lua)["createuserid"] result["modify"] = (*lua)["createuser"] result["event"] = (*lua)["event"] } lock.Lock() if len(result) > 0 { savearr = append(savearr, result) } if len(savearr) > 500 { tmps := savearr MgoE.SaveBulk("luataskinfo_test", tmps...) savearr = []map[string]interface{}{} } lock.Unlock() }(task) } wg.Wait() lock.Lock() if len(savearr) > 0 { MgoE.SaveBulk("luataskinfo", savearr...) savearr = []map[string]interface{}{} } lock.Unlock() TaskMap = map[string]*Task{} //重置 logger.Debug("---保存信息完成---") } //创建任务 func CreateLuaTask() { defer qu.Catch() logger.Debug("---开始创建任务---") sess := MgoE.GetMgoConn() defer MgoE.DestoryMongoConn(sess) ch := make(chan bool, 1) wg := &sync.WaitGroup{} field := map[string]interface{}{ "comeintime": 0, //"updatetime": 0, } query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": GetTime(0), }, } it := sess.DB("editor").C("luataskinfo").Find(&query).Select(&field).Iter() count, _ := sess.DB("editor").C("luataskinfo").Find(&query).Count() logger.Debug("共有异常爬虫数据量", count, "条") n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) func(tmp map[string]interface{}) { //目前不用多线程 defer func() { <-ch wg.Done() }() id := mgo.BsonIdToSId(tmp["_id"]) code := qu.ObjToString(tmp["code"]) site := qu.ObjToString(tmp["site"]) channel := qu.ObjToString(tmp["channel"]) description := qu.ObjToString(tmp["description"]) errtype := qu.ObjToString(tmp["errtype"]) errinfo := tmp["errinfo"].(map[string]interface{}) modifyid := qu.ObjToString(tmp["modifyid"]) modify := qu.ObjToString(tmp["modify"]) event := qu.IntAll(tmp["event"]) state := qu.IntAll(tmp["state"]) //初始化一些任务的变量 n_imin := 0 //最小下载量 n_itimes := 0 //任务出现特别紧急的次数 if state == 1 { n_itimes = 1 } n_idn := 0 //下载量 n_sdt := "" //下载量对应的日期 n_surgency := "1" //紧急程度 // dnerr := errinfo["1"] if errtype == "1" && dnerr != nil { //只有任务类型是数据量异常时,才记录数据量信息 info := errinfo["1"].(map[string]interface{}) n_imin = qu.IntAll(info["average"]) n_idn = qu.IntAll(info["num"]) n_sdt = qu.ObjToString(info["date"]) } if errtype == "8" || errtype == "7" || errtype == "6" { n_surgency = "4" } query := map[string]interface{}{ "s_code": code, "i_state": map[string]interface{}{ "$in": []int{0, 1, 2, 3, 5}, }, } list, _ := MgoE.Find("task", query, nil, nil, false, -1, -1) if list != nil && len(*list) > 0 { //已有任务 if len(*list) > 1 { logger.Error("Code:", code, "任务异常") MgoE.Save("luacreatetaskerr", map[string]interface{}{ "code": code, "comeintime": time.Now().Unix(), "tasknum": len(*list), }) return } task := (*list)[0] o_istate := qu.IntAll(task["i_state"]) //已有任务的状态 o_stype := qu.ObjToString(task["s_type"]) //已有任务的类型 o_sdescript := qu.ObjToString(task["s_descript"]) //已有任务的描述 o_addinfoid, _ := task["addinfoid"].([]interface{}) //luataskinfo信息 o_lcomplete := qu.Int64All(task["l_complete"]) //已有任务的最迟完成时间 o_surgency := qu.ObjToString(task["s_urgency"]) //已有任务的紧急度 o_iurgency, _ := strconv.Atoi(o_surgency) //已有任务的紧急度int类型 o_itimes := qu.IntAll(task["i_times"]) //已有任务出现的次数 // o_addinfoid = append(o_addinfoid, id) //追加addinfoid信息 o_sdescript += time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + description //追加描述 set := map[string]interface{}{} //MgoE.Update("task", q, s, false, false) if state == 1 { //新任务为待处理 if o_istate <= 2 { if errtype > o_stype { //历史任务是待确认、待处理、处理中状态且任务类型等级低于新建任务,任务类型替换为新任务类型 o_stype = errtype } o_surgency = n_surgency //更新紧急度 o_itimes++ set = map[string]interface{}{ "addinfoid": o_addinfoid, "s_descript": o_sdescript, /// "i_min": n_imin, // "i_num": n_idn, // "s_downloadtime": n_sdt, "i_state": state, "l_complete": CompleteTime(o_surgency), "s_urgency": o_surgency, "s_type": o_stype, "i_times": o_itimes, "l_updatetime": time.Now().Unix(), } } else { //历史任务类型为未通过或待审核,更新信息 set = map[string]interface{}{ "addinfoid": o_addinfoid, "s_descript": o_sdescript, "l_updatetime": time.Now().Unix(), } } } else { //新任务为待确认 if o_istate == 0 { //历史任务为待确认 if o_stype == "1" { //历史任务为数量异常待确认 if errtype == "1" { //新任务为数量异常待确认,按紧急程度递增,次数递增 o_iurgency++ //紧急度加一级 if o_iurgency >= 4 { //出现特别紧急的状态,记录次数itimes o_itimes++ o_iurgency = 4 } o_surgency = fmt.Sprint(o_iurgency) o_lcomplete = CompleteTime(o_surgency) if o_itimes >= 5 { //特别紧急的次数出现5次,自动创建待处理的任务(排除有待审核任务的可能) state = 1 } set = map[string]interface{}{ "addinfoid": o_addinfoid, "s_descript": o_sdescript, "i_min": n_imin, "i_num": n_idn, "s_downloadtime": n_sdt, "i_state": state, "l_complete": o_lcomplete, "s_urgency": o_surgency, "s_type": errtype, "i_times": o_itimes, "l_updatetime": time.Now().Unix(), } } else { //新任务为其他异常类型待确认,紧急程度:紧急; if o_iurgency < 4 { //数量异常,特别紧急以下 o_surgency = "1" } else { o_surgency = "2" } set = map[string]interface{}{ "addinfoid": o_addinfoid, "s_descript": o_sdescript, "i_min": n_imin, "i_num": n_idn, "s_downloadtime": n_sdt, "i_state": state, "l_complete": CompleteTime(o_surgency), "s_urgency": o_surgency, "s_type": errtype, "l_updatetime": time.Now().Unix(), } } } else { //其他任务类型待确认,历史任务紧急程度+1,次数+1,任务类型更新为异常等级高者且连续4天变为待处理 if errtype > o_stype { o_stype = errtype } o_iurgency++ //紧急度加一级 if o_iurgency >= 4 { //出现特别紧急的状态,记录次数itimes o_itimes++ o_iurgency = 4 state = 1 //特别紧急,任务变为待处理 } o_surgency = fmt.Sprint(o_iurgency) set = map[string]interface{}{ "addinfoid": o_addinfoid, "s_descript": o_sdescript, "i_min": n_imin, "i_num": n_idn, "s_downloadtime": n_sdt, "i_state": state, "l_complete": CompleteTime(o_surgency), "s_urgency": o_surgency, "s_type": o_stype, "i_times": o_itimes, "l_updatetime": time.Now().Unix(), } } } else { //历史任务为待处理以上,只追加描述 set = map[string]interface{}{ "addinfoid": o_addinfoid, "s_descript": o_sdescript, "i_min": n_imin, "i_num": n_idn, "s_downloadtime": n_sdt, "l_updatetime": time.Now().Unix(), } } } MgoE.Update("task", map[string]interface{}{"_id": task["_id"]}, map[string]interface{}{"$set": set}, false, false) } else { SaveTask(code, site, channel, modifyid, modify, description, n_surgency, n_sdt, errtype, state, n_imin, n_idn, event, n_itimes, []string{id}) } }(tmp) if n%100 == 0 { qu.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("---任务创建完成---") } func SaveTask(code, site, channel, modifyid, modify, description, urgency, downloadtime, errtype string, state, min, downloadnum, event, times int, addinfoid []string) { defer qu.Catch() result := map[string]interface{}{} // if stateNum := UserTaskNum[modify]; stateNum == nil { // tmp := map[string]int{fmt.Sprint(state): 1} // UserTaskNum[modify] = tmp // } else { // stateNum[fmt.Sprint(state)]++ // } // if state == 1 { //待处理任务,紧急程度定为特别紧急 // urgency = "4" // } result["s_code"] = code result["s_site"] = site result["s_channel"] = channel result["s_modifyid"] = modifyid result["s_modify"] = modify result["s_descript"] = description result["i_min"] = min result["i_num"] = downloadnum //下载量 result["s_urgency"] = urgency result["i_state"] = state result["i_event"] = event result["s_downloadtime"] = downloadtime //下载量对应的日期 result["l_comeintime"] = time.Now().Unix() result["l_updatetime"] = time.Now().Unix() result["l_complete"] = CompleteTime(urgency) //result["s_date"] = time.Now().Format(qu.Date_Short_Layout) //任务创建字符串日期 result["i_times"] = times //为了方便编辑器对次数的排序,记录当前的次数 result["s_type"] = errtype //任务类型 result["addinfoid"] = addinfoid //信息id result["s_source"] = "程序" MgoE.Save("task", result) } func SaveUserCreateTaskNum() { defer qu.Catch() for user, sn := range UserTaskNum { save := map[string]interface{}{} save["user"] = user save["comeintime"] = time.Now().Unix() for s, n := range sn { save[s] = n } MgoE.Save("luausertask", save) } UserTaskNum = map[string]map[string]int{} } //重置前一周内未下载成功的数据(一天3次未下成功的数据可以连续下一周) func ResetDataState() { defer qu.Catch() logger.Info("-----更新数据状态-----") sess := MgoS.GetMgoConn() defer MgoS.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": GetTime(-DayNum), "$lt": GetTime(0), }, "state": -1, } field := map[string]interface{}{ "_id": 1, } it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter() count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count() logger.Info("更新数据状态数量:", count) n := 0 arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() update := []map[string]interface{}{} update = append(update, map[string]interface{}{"_id": tmp["_id"]}) update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}}) lock.Lock() arr = append(arr, update) if len(arr) > 500 { tmps := arr MgoS.UpdateBulk("spider_highlistdata", tmps...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) tmp = map[string]interface{}{} } wg.Wait() lock.Lock() if len(arr) > 0 { MgoS.UpdateBulk("spider_highlistdata", arr...) arr = [][]map[string]interface{}{} } lock.Unlock() logger.Info("-----更新数据状态完毕-----") }