12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082 |
- package main
- import (
- "fmt"
- mgo "mongodb"
- qu "qfw/util"
- "strconv"
- "sync"
- "time"
- "github.com/donnie4w/go-logger/logger"
- )
- type Task struct {
- Code string //爬虫代码
- Site string //站点
- Channel string //栏目
- ErrType string //异常类型:6:运行异常;5:下载异常;4:发布时间异常;3:乱码;2:状态码异常;1:数据量异常
- ErrInfo map[string]map[string]interface{} //异常集合
- Description string //描述
- State int //状态
- }
- var (
- StartTime int64 //上一个工作日的起始时间
- EndTime int64 //上一个工作日的结束时间
- TaskMap map[string]*Task //任务集合
- UpdateStateCron string //每天关闭任务的时间
- CreateTaskCron string //每天创建任务的时间
- CloseTaskCron string //每天关闭任务的时间
- CodeSummaryCron string //每天统计爬虫信息
- CloseNum int //关闭几天的任务
- DayNum int //更新数据天数
- UserTaskNum map[string]map[string]int //记录每人每天新建任务量
- )
- //创建任务
- func CreateTaskProcess() {
- InitInfo() //初始化
- GetSpiderDownloadRateData() //1、统计spider_downloadrate前一天列表页采集异常爬虫
- GetStatusCodeErrorData() //2、统计spider_sitecheck 站点异常爬虫(404)
- GetDownloadFailedData() //3、统计spider_highlistdata前一天下载失败的爬虫数据(统计完成后修改状态state:0)
- GetRegatherFailedData() //4、统计regatherdata前一天重采失败的爬虫数据
- GetDTPErrData() //5、统计spider_warn异常数据(发布时间异常、乱码)
- GetDownloadNumErrData() //6、统计download前一天下载量异常的爬虫数据(每天1点统计下载量,目前统计完成需要1个小时)
- SaveResult() //保存统计信息
- CreateLuaTask() //创建任务
- SaveUserCreateTaskNum() //保存每人创建的任务量
- }
- //初始化
- func InitInfo() {
- defer qu.Catch()
- TaskMap = map[string]*Task{}
- UserTaskNum = map[string]map[string]int{}
- InitTime() //初始化时间
- }
- //关闭任务
- func CloseTask() {
- qu.Catch()
- logger.Debug("---清理未更新任务---")
- decreaseDay, day := 0, 0
- var cleanDay string
- for {
- decreaseDay--
- weekDay := time.Now().AddDate(0, 0, decreaseDay).Weekday().String()
- if weekDay != "Saturday" && weekDay != "Sunday" {
- day++
- }
- if day == CloseNum {
- cleanDay = time.Now().AddDate(0, 0, decreaseDay).Format("2006-01-02")
- break
- }
- }
- the_time, _ := time.ParseInLocation(qu.Date_Short_Layout, cleanDay, time.Local)
- unix_time := the_time.Unix() //凌晨时间戳
- query := map[string]interface{}{
- "i_state": 0,
- "l_complete": map[string]interface{}{
- "$lt": unix_time + 86400,
- },
- "s_type": "1",
- // "s_type": map[string]interface{}{
- // "$ne": "7",
- // },
- }
- logger.Debug("query:", query)
- set := map[string]interface{}{
- "$set": map[string]interface{}{
- "i_state": 6,
- },
- }
- MgoE.Update("task", query, set, false, true)
- logger.Debug("---清理未更新任务完毕---")
- }
- //1、统计spider_downloadrate前一天列表页采集异常爬虫
- func GetSpiderDownloadRateData() {
- defer qu.Catch()
- logger.Debug("---开始统计spider_downloadrate异常信息---")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
- query := map[string]interface{}{
- "date": date,
- }
- it := sess.DB("spider").C("spider_downloadrate").Find(&query).Iter()
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- stype := -1
- //1、统计采集频率异常信息
- oh_percent := qu.IntAll(tmp["oh_percent"])
- event := qu.IntAll(tmp["event"])
- if oh_percent > 0 && event != 7410 {
- stype = 8
- }
- //2、统计列表页异常(统计zero占总下载次数的百分比超过80%的)
- alltimes := qu.IntAll(tmp["alltimes"])
- zero := qu.IntAll(tmp["zero"])
- percent := 0 //记录百分比
- if zero > 0 {
- tmpPercent := float64(zero) / float64(alltimes)
- tmpPercent, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", tmpPercent), 64)
- percent = int(tmpPercent * float64(100))
- if percent >= 80 { //占比超过80%
- stype = 7
- }
- }
- if stype != -1 { //出现异常
- code := qu.ObjToString(tmp["spidercode"])
- site := qu.ObjToString(tmp["site"])
- channel := qu.ObjToString(tmp["channel"])
- t := &Task{
- Code: code,
- Site: site,
- Channel: channel,
- ErrInfo: map[string]map[string]interface{}{},
- State: 1,
- }
- if stype == 8 {
- t.ErrType = "8"
- t.ErrInfo = map[string]map[string]interface{}{
- "8": map[string]interface{}{
- "num": oh_percent,
- },
- }
- t.Description = "采集频率异常:\n 列表页共采集" + fmt.Sprint(alltimes) + "轮,其中有" + fmt.Sprint(oh_percent) + "轮数据全采\n"
- } else if stype == 7 {
- t.ErrType = "7"
- t.ErrInfo = map[string]map[string]interface{}{
- "7": map[string]interface{}{
- "num": percent,
- },
- }
- t.Description = "列表页异常:\n 列表页采集无信息次数占比" + fmt.Sprint(percent) + "%\n"
- }
- lock.Lock()
- TaskMap[code] = t
- lock.Unlock()
- }
- }(tmp)
- if n%100 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("---统计spider_downloadrate异常信息完成---")
- }
- //2、状态码404
- func GetStatusCodeErrorData() {
- defer qu.Catch()
- logger.Debug("---开始统计栏目地址404数据---")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- field := map[string]interface{}{
- "url": 1,
- "code": 1,
- "site": 1,
- "channel": 1,
- }
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lte": EndTime,
- },
- "statuscode": 404,
- }
- it := sess.DB("spider").C("spider_sitecheck").Find(&query).Select(&field).Iter()
- count, _ := sess.DB("spider").C("spider_sitecheck").Find(&query).Count()
- logger.Debug("共有404地址", count, "条")
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["code"])
- one, _ := MgoE.FindOneByField("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"state": 1})
- state := qu.IntAll((*one)["state"])
- if state == 4 || state > 6 {
- return
- }
- //判断3天内是否有采集数据,有则不建404任务
- stime, etime := GetTime(-3), GetTime(0)
- q := map[string]interface{}{
- "spidercode": code,
- "l_np_publishtime": map[string]interface{}{
- "$gte": stime,
- "$lte": etime,
- },
- }
- if MgoS.Count("data_bak", q) > 0 { //有采集数据,不认为是404
- return
- }
- href := qu.ObjToString(tmp["url"])
- site := qu.ObjToString(tmp["site"])
- channel := qu.ObjToString(tmp["channel"])
- lock.Lock()
- if t := TaskMap[code]; t != nil {
- t.ErrInfo["6"] = map[string]interface{}{ //ErrInfo新增下载异常信息
- "num": 404,
- "hrefs": []string{href},
- }
- t.Description += "网站监测:404\n" + href + "\n"
- t.State = 1
- } else {
- t := &Task{
- Code: code,
- Site: site,
- Channel: channel,
- ErrType: "6",
- ErrInfo: map[string]map[string]interface{}{},
- Description: "网站监测:404\n" + href + "\n",
- State: 1,
- }
- t.ErrInfo = map[string]map[string]interface{}{
- "6": map[string]interface{}{
- "num": 404,
- "hrefs": []string{href},
- },
- }
- TaskMap[code] = t
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("---统计栏目地址404数据完成---")
- }
- //3、统计三级页下载失败数据
- /*
- 先统计下载失败信息再更新下载失败信息状态(ResetDataState)使其可重新下载,这样不影响统计
- 但是任务已经就绪,若下载失败信息重新下载成功,则使任务不太准备
- 若先重置状态再统计,会使任务统计时缺少,无法正常监控
- */
- func GetDownloadFailedData() {
- defer qu.Catch()
- logger.Debug("---开始统计下载失败信息---")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- field := map[string]interface{}{
- "spidercode": 1,
- "href": 1,
- "site": 1,
- "channel": 1,
- }
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lte": EndTime,
- },
- "state": -1,
- }
- it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter()
- count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count()
- logger.Debug("共有下载失败数据", count, "条")
- n := 0
- //arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["spidercode"])
- href := qu.ObjToString(tmp["href"])
- site := qu.ObjToString(tmp["site"])
- channel := qu.ObjToString(tmp["channel"])
- lock.Lock()
- if t := TaskMap[code]; t != nil {
- if info := t.ErrInfo["5"]; info != nil {
- num := qu.IntAll(info["num"])
- num++
- info["num"] = num
- hrefs := info["hrefs"].([]string)
- if len(hrefs) < 3 {
- hrefs = append(hrefs, href)
- info["hrefs"] = hrefs
- t.Description += href + "\n"
- }
- if num >= 10 {
- t.State = 1
- }
- } else {
- t.ErrInfo["5"] = map[string]interface{}{ //ErrInfo新增下载异常信息
- "num": 1,
- "hrefs": []string{href},
- }
- t.Description += "下载异常:\n" + href + "\n"
- }
- } else {
- t := &Task{
- Code: code,
- Site: site,
- Channel: channel,
- ErrType: "5",
- ErrInfo: map[string]map[string]interface{}{},
- Description: "下载异常:\n" + href + "\n",
- State: 0,
- }
- t.ErrInfo = map[string]map[string]interface{}{
- "5": map[string]interface{}{
- "num": 1,
- "hrefs": []string{href},
- },
- }
- TaskMap[code] = t
- }
- //更新state状态重新下载
- // update := []map[string]interface{}{}
- // update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- // update = append(update, map[string]interface{}{"$set": map[string]interface{}{"state": 0, "times": 0}})
- // arr = append(arr, update)
- // if len(arr) > 500 {
- // tmps := arr
- // MgoS.UpdateBulk("spider_highlistdata", tmps...)
- // arr = [][]map[string]interface{}{}
- // }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- // lock.Lock()
- // if len(arr) > 0 {
- // MgoS.UpdateBulk("spider_highlistdata", arr...)
- // arr = [][]map[string]interface{}{}
- // }
- // lock.Unlock()
- logger.Debug("---统计下载失败信息完成---")
- }
- //4、统计重采失败数据
- func GetRegatherFailedData() {
- defer qu.Catch()
- logger.Debug("---开始统计重采失败信息---")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- field := map[string]interface{}{
- "spidercode": 1,
- "href": 1,
- "site": 1,
- "channel": 1,
- }
- query := map[string]interface{}{
- "state": map[string]interface{}{
- "$lte": 1,
- },
- "from": "lua",
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lte": EndTime,
- },
- }
- it := sess.DB("spider").C("regatherdata").Find(&query).Select(&field).Iter()
- count, _ := sess.DB("spider").C("regatherdata").Find(&query).Count()
- logger.Debug("共有重采失败数据", count, "条")
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["spidercode"])
- href := qu.ObjToString(tmp["href"])
- site := qu.ObjToString(tmp["site"])
- channel := qu.ObjToString(tmp["channel"])
- lock.Lock()
- if t := TaskMap[code]; t != nil {
- if info := t.ErrInfo["4"]; info != nil {
- num := qu.IntAll(info["num"])
- num++
- info["num"] = num
- hrefs := info["hrefs"].([]string)
- if len(hrefs) < 3 {
- hrefs = append(hrefs, href)
- info["hrefs"] = hrefs
- t.Description += href + "\n"
- }
- if num >= 10 {
- t.State = 1
- }
- } else {
- t.ErrInfo["4"] = map[string]interface{}{ //ErrInfo新增下载异常信息
- "num": 1,
- "hrefs": []string{href},
- }
- t.Description += "运行报错:\n" + href + "\n"
- }
- } else {
- t := &Task{
- Code: code,
- Site: site,
- Channel: channel,
- ErrType: "4",
- ErrInfo: map[string]map[string]interface{}{},
- Description: "运行报错:\n" + href + "\n",
- State: 0,
- }
- t.ErrInfo = map[string]map[string]interface{}{
- "4": map[string]interface{}{
- "num": 1,
- "hrefs": []string{href},
- },
- }
- TaskMap[code] = t
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- // for _, task := range TaskMap {
- // qu.Debug("code:", task.Code)
- // qu.Debug("site:", task.Site)
- // qu.Debug("channel:", task.Channel)
- // qu.Debug("errtype:", task.ErrType)
- // qu.Debug("description:", task.Description)
- // qu.Debug("info:", task.ErrInfo)
- // qu.Debug("-------------------------------------------")
- // tmap := map[string]interface{}{}
- // ab, _ := json.Marshal(&task)
- // json.Unmarshal(ab, &tmap)
- // MgoE.Save("save_aa", tmap)
- // }
- logger.Debug("---统计重采失败信息完成---")
- }
- //5、统计detail、title、publishtime异常数据
- func GetDTPErrData() {
- defer qu.Catch()
- logger.Debug("---开始统计信息异常数据---")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- field := map[string]interface{}{
- "code": 1,
- "href": 1,
- "site": 1,
- "channel": 1,
- "field": 1,
- "info": 1,
- }
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": StartTime,
- "$lte": EndTime,
- },
- "level": 2, //2:error数据 1:warn数据
- }
- it := sess.DB("spider").C("spider_warn").Find(&query).Select(&field).Iter()
- count, _ := sess.DB("spider").C("spider_warn").Find(&query).Count()
- logger.Debug("共有信息异常数据", count, "条")
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- errnum := "2" //detail、 title异常
- destmp := "正文标题异常:\n"
- field := qu.ObjToString(tmp["field"])
- info := qu.ObjToString(tmp["info"])
- if field == "publishtime" { //发布时间异常
- if info == "Publishtime Is Too Late" { //发布时间超前的不建任务
- return
- }
- errnum = "3"
- destmp = "发布时间异常:\n"
- }
- code := qu.ObjToString(tmp["code"])
- href := qu.ObjToString(tmp["href"])
- site := qu.ObjToString(tmp["site"])
- channel := qu.ObjToString(tmp["channel"])
- lock.Lock()
- if t := TaskMap[code]; t != nil {
- if info := t.ErrInfo[errnum]; info != nil {
- num := qu.IntAll(info["num"])
- num++
- info["num"] = num
- hrefs := info["hrefs"].([]string)
- if len(hrefs) < 3 {
- hrefs = append(hrefs, href)
- info["hrefs"] = hrefs
- t.Description += href + "\n"
- }
- if num >= 10 {
- t.State = 1
- }
- } else {
- t.ErrInfo[errnum] = map[string]interface{}{
- "num": 1,
- "hrefs": []string{href},
- }
- t.Description += destmp + href + "\n"
- }
- } else {
- t := &Task{
- Code: code,
- Site: site,
- Channel: channel,
- ErrType: errnum,
- ErrInfo: map[string]map[string]interface{}{},
- Description: destmp + href + "\n",
- State: 0,
- }
- t.ErrInfo = map[string]map[string]interface{}{
- errnum: map[string]interface{}{
- "num": 1,
- "hrefs": []string{href},
- },
- }
- TaskMap[code] = t
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("---统计信息异常数据完成---")
- }
- //6、统计下载量异常数据
- func GetDownloadNumErrData() {
- defer qu.Catch()
- logger.Debug("---开始统计下载量异常数据---")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 5)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- field := map[string]interface{}{
- "downloadNum": 1,
- "code": 1,
- "averageDownload": 1,
- "site": 1,
- "channel": 1,
- }
- query := map[string]interface{}{
- "isok": false,
- }
- it := sess.DB("spider").C("spider_download").Find(&query).Select(&field).Iter()
- count, _ := sess.DB("spider").C("spider_download").Find(&query).Count()
- logger.Debug("共有下载量异常数据", count, "条")
- n := 0
- arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- code := qu.ObjToString(tmp["code"])
- site := qu.ObjToString(tmp["site"])
- channel := qu.ObjToString(tmp["channel"])
- average := qu.IntAll(tmp["averageDownload"])
- date := "" //日期
- dnum := 0 //下载量
- for d, n := range tmp["downloadNum"].(map[string]interface{}) {
- date = d
- dnum = qu.IntAll(n)
- }
- lock.Lock()
- if t := TaskMap[code]; t != nil {
- t.ErrInfo["1"] = map[string]interface{}{ //ErrInfo新增下载异常信息
- "num": dnum,
- "date": date,
- "average": average,
- }
- t.Description += "下载量异常:\n" + date + ":" + fmt.Sprint(dnum) + "\n"
- } else {
- t := &Task{
- Code: code,
- Site: site,
- Channel: channel,
- ErrType: "1",
- ErrInfo: map[string]map[string]interface{}{},
- Description: "下载量异常:\n" + date + ":" + fmt.Sprint(dnum) + "\n",
- State: 0,
- }
- t.ErrInfo = map[string]map[string]interface{}{
- "1": map[string]interface{}{
- "num": dnum,
- "date": date,
- "average": average,
- },
- }
- TaskMap[code] = t
- }
- //更新isok
- update := []map[string]interface{}{}
- update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- update = append(update, map[string]interface{}{"$set": map[string]interface{}{"isok": true}})
- arr = append(arr, update)
- if len(arr) > 500 {
- tmps := arr
- MgoS.UpdateBulk("spider_download", tmps...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- if n%100 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- lock.Lock()
- if len(arr) > 0 {
- MgoS.UpdateBulk("spider_download", arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- logger.Debug("---统计下载量异常数据完成---")
- }
- //保存统计信息
- func SaveResult() {
- defer qu.Catch()
- logger.Debug("---开始保存信息---")
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- ch := make(chan bool, 10)
- savearr := []map[string]interface{}{}
- for _, task := range TaskMap {
- wg.Add(1)
- ch <- true
- go func(t *Task) {
- defer func() {
- <-ch
- wg.Done()
- }()
- delYearMinCode := false
- if errInfo := t.ErrInfo; errInfo != nil {
- //爬虫任务为下载异常、运行异常、404、时间异常、数据异常任务时,不再建该爬虫的抽查任务
- if len(errInfo) >= 2 || (len(errInfo) == 1 && errInfo["1"] == nil) { //不是数量异常任务
- delYearMinCode = true
- }
- }
- lock.Lock()
- has := YearMinCodeMap[t.Code]
- lock.Unlock()
- if delYearMinCode {
- lock.Lock()
- delete(YearMinCodeMap, t.Code)
- lock.Unlock()
- go MgoE.Update("luayearmincode", map[string]interface{}{"code": t.Code}, map[string]interface{}{"$set": map[string]interface{}{"send": true}}, false, false)
- } else if has { //luayearmincode中爬虫任务删除
- return
- }
- result := map[string]interface{}{}
- result["code"] = t.Code
- result["site"] = t.Site
- result["channel"] = t.Channel
- result["errtype"] = t.ErrType
- result["errinfo"] = t.ErrInfo
- result["description"] = t.Description
- result["comeintime"] = time.Now().Unix()
- result["state"] = t.State
- //result["updatetime"] = time.Now().Unix()
- lua, _ := MgoE.FindOne("luaconfig", map[string]interface{}{"code": t.Code})
- if lua != nil && len(*lua) > 0 {
- result["modifyid"] = (*lua)["createuserid"]
- result["modify"] = (*lua)["createuser"]
- result["event"] = (*lua)["event"]
- }
- lock.Lock()
- if len(result) > 0 {
- savearr = append(savearr, result)
- }
- if len(savearr) > 500 {
- tmps := savearr
- MgoE.SaveBulk("luataskinfo_test", tmps...)
- savearr = []map[string]interface{}{}
- }
- lock.Unlock()
- }(task)
- }
- wg.Wait()
- lock.Lock()
- if len(savearr) > 0 {
- MgoE.SaveBulk("luataskinfo", savearr...)
- savearr = []map[string]interface{}{}
- }
- lock.Unlock()
- TaskMap = map[string]*Task{} //重置
- logger.Debug("---保存信息完成---")
- }
- //创建任务
- func CreateLuaTask() {
- defer qu.Catch()
- logger.Debug("---开始创建任务---")
- sess := MgoE.GetMgoConn()
- defer MgoE.DestoryMongoConn(sess)
- ch := make(chan bool, 1)
- wg := &sync.WaitGroup{}
- field := map[string]interface{}{
- "comeintime": 0,
- //"updatetime": 0,
- }
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": GetTime(0),
- },
- }
- it := sess.DB("editor").C("luataskinfo").Find(&query).Select(&field).Iter()
- count, _ := sess.DB("editor").C("luataskinfo").Find(&query).Count()
- logger.Debug("共有异常爬虫数据量", count, "条")
- n := 0
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- func(tmp map[string]interface{}) { //目前不用多线程
- defer func() {
- <-ch
- wg.Done()
- }()
- id := mgo.BsonIdToSId(tmp["_id"])
- code := qu.ObjToString(tmp["code"])
- site := qu.ObjToString(tmp["site"])
- channel := qu.ObjToString(tmp["channel"])
- description := qu.ObjToString(tmp["description"])
- errtype := qu.ObjToString(tmp["errtype"])
- errinfo := tmp["errinfo"].(map[string]interface{})
- modifyid := qu.ObjToString(tmp["modifyid"])
- modify := qu.ObjToString(tmp["modify"])
- event := qu.IntAll(tmp["event"])
- state := qu.IntAll(tmp["state"])
- //初始化一些任务的变量
- n_imin := 0 //最小下载量
- n_itimes := 0 //任务出现特别紧急的次数
- if state == 1 {
- n_itimes = 1
- }
- n_idn := 0 //下载量
- n_sdt := "" //下载量对应的日期
- n_surgency := "1" //紧急程度
- //
- dnerr := errinfo["1"]
- if errtype == "1" && dnerr != nil { //只有任务类型是数据量异常时,才记录数据量信息
- info := errinfo["1"].(map[string]interface{})
- n_imin = qu.IntAll(info["average"])
- n_idn = qu.IntAll(info["num"])
- n_sdt = qu.ObjToString(info["date"])
- }
- if errtype == "8" || errtype == "7" || errtype == "6" {
- n_surgency = "4"
- }
- query := map[string]interface{}{
- "s_code": code,
- "i_state": map[string]interface{}{
- "$in": []int{0, 1, 2, 3, 5},
- },
- }
- list, _ := MgoE.Find("task", query, nil, nil, false, -1, -1)
- if list != nil && len(*list) > 0 { //已有任务
- if len(*list) > 1 {
- logger.Error("Code:", code, "任务异常")
- MgoE.Save("luacreatetaskerr", map[string]interface{}{
- "code": code,
- "comeintime": time.Now().Unix(),
- "tasknum": len(*list),
- })
- return
- }
- task := (*list)[0]
- o_istate := qu.IntAll(task["i_state"]) //已有任务的状态
- o_stype := qu.ObjToString(task["s_type"]) //已有任务的类型
- o_sdescript := qu.ObjToString(task["s_descript"]) //已有任务的描述
- o_addinfoid, _ := task["addinfoid"].([]interface{}) //luataskinfo信息
- o_lcomplete := qu.Int64All(task["l_complete"]) //已有任务的最迟完成时间
- o_surgency := qu.ObjToString(task["s_urgency"]) //已有任务的紧急度
- o_iurgency, _ := strconv.Atoi(o_surgency) //已有任务的紧急度int类型
- o_itimes := qu.IntAll(task["i_times"]) //已有任务出现的次数
- //
- o_addinfoid = append(o_addinfoid, id) //追加addinfoid信息
- o_sdescript += time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + description //追加描述
- set := map[string]interface{}{}
- //MgoE.Update("task", q, s, false, false)
- if state == 1 { //新任务为待处理
- if o_istate <= 2 {
- if errtype > o_stype { //历史任务是待确认、待处理、处理中状态且任务类型等级低于新建任务,任务类型替换为新任务类型
- o_stype = errtype
- }
- o_surgency = n_surgency //更新紧急度
- o_itimes++
- set = map[string]interface{}{
- "addinfoid": o_addinfoid,
- "s_descript": o_sdescript,
- /// "i_min": n_imin,
- // "i_num": n_idn,
- // "s_downloadtime": n_sdt,
- "i_state": state,
- "l_complete": CompleteTime(o_surgency),
- "s_urgency": o_surgency,
- "s_type": o_stype,
- "i_times": o_itimes,
- "l_updatetime": time.Now().Unix(),
- }
- } else { //历史任务类型为未通过或待审核,更新信息
- set = map[string]interface{}{
- "addinfoid": o_addinfoid,
- "s_descript": o_sdescript,
- "l_updatetime": time.Now().Unix(),
- }
- }
- } else { //新任务为待确认
- if o_istate == 0 { //历史任务为待确认
- if o_stype == "1" { //历史任务为数量异常待确认
- if errtype == "1" { //新任务为数量异常待确认,按紧急程度递增,次数递增
- o_iurgency++ //紧急度加一级
- if o_iurgency >= 4 { //出现特别紧急的状态,记录次数itimes
- o_itimes++
- o_iurgency = 4
- }
- o_surgency = fmt.Sprint(o_iurgency)
- o_lcomplete = CompleteTime(o_surgency)
- if o_itimes >= 5 { //特别紧急的次数出现5次,自动创建待处理的任务(排除有待审核任务的可能)
- state = 1
- }
- set = map[string]interface{}{
- "addinfoid": o_addinfoid,
- "s_descript": o_sdescript,
- "i_min": n_imin,
- "i_num": n_idn,
- "s_downloadtime": n_sdt,
- "i_state": state,
- "l_complete": o_lcomplete,
- "s_urgency": o_surgency,
- "s_type": errtype,
- "i_times": o_itimes,
- "l_updatetime": time.Now().Unix(),
- }
- } else { //新任务为其他异常类型待确认,紧急程度:紧急;
- if o_iurgency < 4 { //数量异常,特别紧急以下
- o_surgency = "1"
- } else {
- o_surgency = "2"
- }
- set = map[string]interface{}{
- "addinfoid": o_addinfoid,
- "s_descript": o_sdescript,
- "i_min": n_imin,
- "i_num": n_idn,
- "s_downloadtime": n_sdt,
- "i_state": state,
- "l_complete": CompleteTime(o_surgency),
- "s_urgency": o_surgency,
- "s_type": errtype,
- "l_updatetime": time.Now().Unix(),
- }
- }
- } else { //其他任务类型待确认,历史任务紧急程度+1,次数+1,任务类型更新为异常等级高者且连续4天变为待处理
- if errtype > o_stype {
- o_stype = errtype
- }
- o_iurgency++ //紧急度加一级
- if o_iurgency >= 4 { //出现特别紧急的状态,记录次数itimes
- o_itimes++
- o_iurgency = 4
- state = 1 //特别紧急,任务变为待处理
- }
- o_surgency = fmt.Sprint(o_iurgency)
- set = map[string]interface{}{
- "addinfoid": o_addinfoid,
- "s_descript": o_sdescript,
- "i_min": n_imin,
- "i_num": n_idn,
- "s_downloadtime": n_sdt,
- "i_state": state,
- "l_complete": CompleteTime(o_surgency),
- "s_urgency": o_surgency,
- "s_type": o_stype,
- "i_times": o_itimes,
- "l_updatetime": time.Now().Unix(),
- }
- }
- } else { //历史任务为待处理以上,只追加描述
- set = map[string]interface{}{
- "addinfoid": o_addinfoid,
- "s_descript": o_sdescript,
- "i_min": n_imin,
- "i_num": n_idn,
- "s_downloadtime": n_sdt,
- "l_updatetime": time.Now().Unix(),
- }
- }
- }
- MgoE.Update("task", map[string]interface{}{"_id": task["_id"]}, map[string]interface{}{"$set": set}, false, false)
- } else {
- SaveTask(code, site, channel, modifyid, modify, description, n_surgency, n_sdt, errtype, state, n_imin, n_idn, event, n_itimes, []string{id})
- }
- }(tmp)
- if n%100 == 0 {
- qu.Debug("current:", n)
- }
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- logger.Debug("---任务创建完成---")
- }
- func SaveTask(code, site, channel, modifyid, modify, description, urgency, downloadtime, errtype string, state, min, downloadnum, event, times int, addinfoid []string) {
- defer qu.Catch()
- result := map[string]interface{}{}
- // if stateNum := UserTaskNum[modify]; stateNum == nil {
- // tmp := map[string]int{fmt.Sprint(state): 1}
- // UserTaskNum[modify] = tmp
- // } else {
- // stateNum[fmt.Sprint(state)]++
- // }
- // if state == 1 { //待处理任务,紧急程度定为特别紧急
- // urgency = "4"
- // }
- result["s_code"] = code
- result["s_site"] = site
- result["s_channel"] = channel
- result["s_modifyid"] = modifyid
- result["s_modify"] = modify
- result["s_descript"] = description
- result["i_min"] = min
- result["i_num"] = downloadnum //下载量
- result["s_urgency"] = urgency
- result["i_state"] = state
- result["i_event"] = event
- result["s_downloadtime"] = downloadtime //下载量对应的日期
- result["l_comeintime"] = time.Now().Unix()
- result["l_updatetime"] = time.Now().Unix()
- result["l_complete"] = CompleteTime(urgency)
- //result["s_date"] = time.Now().Format(qu.Date_Short_Layout) //任务创建字符串日期
- result["i_times"] = times //为了方便编辑器对次数的排序,记录当前的次数
- result["s_type"] = errtype //任务类型
- result["addinfoid"] = addinfoid //信息id
- result["s_source"] = "程序"
- MgoE.Save("task", result)
- }
- func SaveUserCreateTaskNum() {
- defer qu.Catch()
- for user, sn := range UserTaskNum {
- save := map[string]interface{}{}
- save["user"] = user
- save["comeintime"] = time.Now().Unix()
- for s, n := range sn {
- save[s] = n
- }
- MgoE.Save("luausertask", save)
- }
- UserTaskNum = map[string]map[string]int{}
- }
- //重置前一周内未下载成功的数据(一天3次未下成功的数据可以连续下一周)
- func ResetDataState() {
- defer qu.Catch()
- logger.Info("-----更新数据状态-----")
- sess := MgoS.GetMgoConn()
- defer MgoS.DestoryMongoConn(sess)
- ch := make(chan bool, 3)
- wg := &sync.WaitGroup{}
- lock := &sync.Mutex{}
- query := map[string]interface{}{
- "comeintime": map[string]interface{}{
- "$gte": GetTime(-DayNum),
- "$lt": GetTime(0),
- },
- "state": -1,
- }
- field := map[string]interface{}{
- "_id": 1,
- }
- it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter()
- count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count()
- logger.Info("更新数据状态数量:", count)
- n := 0
- arr := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
- ch <- true
- wg.Add(1)
- go func(tmp map[string]interface{}) {
- defer func() {
- <-ch
- wg.Done()
- }()
- update := []map[string]interface{}{}
- update = append(update, map[string]interface{}{"_id": tmp["_id"]})
- update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}})
- lock.Lock()
- arr = append(arr, update)
- if len(arr) > 500 {
- tmps := arr
- MgoS.UpdateBulk("spider_highlistdata", tmps...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- }(tmp)
- tmp = map[string]interface{}{}
- }
- wg.Wait()
- lock.Lock()
- if len(arr) > 0 {
- MgoS.UpdateBulk("spider_highlistdata", arr...)
- arr = [][]map[string]interface{}{}
- }
- lock.Unlock()
- logger.Info("-----更新数据状态完毕-----")
- }
|