package luatask import ( "fmt" "github.com/donnie4w/go-logger/logger" "go.mongodb.org/mongo-driver/bson" qu "qfw/util" "sync" "time" "util" ) const NEWTASK_LISTERR, NEWTASK_DATAINFOERR, NEWTASK_PAGEFLIPERR, NEWTASK_RATEERR, NEWTASK_DOWNLOADERR, NEWTASK_DATAINFOWARN = "1", "2", "3", "4", "5", "6" var NewCodeInfoMap = map[string]*NewSpider{} var LuaErrTypeInfo = map[string]string{ NEWTASK_LISTERR: "列表页异常", NEWTASK_DATAINFOERR: "数据异常错误", NEWTASK_PAGEFLIPERR: "爬虫翻页异常", NEWTASK_RATEERR: "采集频率异常", NEWTASK_DOWNLOADERR: "下载异常", NEWTASK_DATAINFOWARN: "数据异常警告", } var DataInfoErrMap = map[int]string{ 1: "Save Coll Error", 2: "File Size Or Url Error", 4: "Field Value Is Null", 9: "Html Contains Temp Language", 10: "Publishtime Is Error", 11: "Publishtime Is Zero", 12: "Field Type Error", } var DataInfoWarnMap = map[int]string{ 5: "Field Value Contains Random Code", 6: "Field Value Not Contains Chinese", 8: "Detail File Err", } var UpdateLuaconfig [][]map[string]interface{} type NewSpider struct { //爬虫基本信息 Code string `bson:"code"` Site string `bson:"site"` Channel string `bson:"channel"` Platform string `bson:"platform"` Event int `bson:"event"` InfoFormat int `bson:"infoformat"` PendState int `bson:"pendstate"` ModifyUser string `bson:"modifyuser"` ModifyId string `bson:"modifyuserid"` ModifyTime int64 `bson:"modifytime"` Model int `bson:"model"` Working int `bson:"working"` AuditTime int64 `bson:"l_uploadtime"` ListIsFilter bool `bson:"listisfilter"` UpLimit int `bson:"uplimit"` MaxPage int `bson:"maxpage"` Page_FlipOk bool `bson:"page_flipok"` Page_OneOk bool `bson:"page_oneok"` Page_TwoOk bool `bson:"page_twook"` CodeTags map[string]interface{} `bson:"codetags"` //统计信息 Detail_DownloadNum int `bson:"detail_downloadnum"` Detail_DownloadSuccessNum int `bson:"detail_downloadsuccessnum"` Detail_DownloadFailNum int `bson:"detail_downloadfailnum"` List_IsGetData bool `bson:"list_isgetdata"` HeartTime int64 `bson:"heart_time"` List_RunTimes int `bson:"list_runtimes"` List_NoDataTimes int `bson:"list_nodatatimes"` List_AllInTimes int `bson:"list_allintimes"` WarnInfoMap map[int]*WarnInfo `bson:"warninfo"` //python Py_TaskId string `bson:"py_taskid"` Py_NodeName string `bson:"py_nodename"` Py_IsValid bool `bson:"py_isvalid"` //站点信息 Channel_Status int `bson:"channel_status"` //栏目响应状态 //补充信息 Comeintime int64 `bson:"comeintime"` //异常汇总 //Error map[string]*ErrorInfo `json:"error"` ErrType string `bson:"errtype"` //记录权重最高的异常类型 ErrTypeMap map[int]bool `bson:"errtypemap"` //记录所有异常 ErrDescription string `bson:"errdescription"` //异常描述 } type WarnInfo struct { Info string `bson:"info"` Num int `bson:"num"` Fields map[string]int `bson:"fields"` Hrefs map[string]string `bson:"hrefs"` } func NewStartTask() { InitInfo() //初始化时间 logger.Info(StartTime, EndTime, Publishtime) getCodeBaseInfo() //获取爬虫基本信息 getCodeStatus() //获取爬虫响应状态信息 getPythonSummaryInfo() //获取python汇总信息 getLuaSummaryInfo() //获取lua汇总信息 getSpiderWarnInfo() //获取异常数据 saveCodeInfo() //汇总异常信息,产出任务 updateLuaconfig() //更新爬虫信息 closeTask() } func getCodeBaseInfo() { defer qu.Catch() sess := util.MgoEB.GetMgoConn() defer util.MgoEB.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "$or": []interface{}{ //lua、python上线爬虫 map[string]interface{}{ "state": map[string]interface{}{ "$in": []int{5, 11}, //上架、上线爬虫 }, }, //lua正在被维护的爬虫和上架爬虫 map[string]interface{}{ "platform": map[string]interface{}{ "$in": []string{"golua平台", "chrome"}, }, "state": map[string]interface{}{ "$in": []int{0, 1, 2}, //待完成、待审核、未通过 }, "event": map[string]interface{}{ "$ne": 7000, }, }, }, } fields := map[string]interface{}{ "code": 1, "site": 1, "channel": 1, "platform": 1, "event": 1, "pendstate": 1, "modifyuser": 1, "modifyuserid": 1, "modifytime": 1, "l_uploadtime": 1, "listisfilter": 1, "codetags": 1, "infoformat": 1, "param_common": 1, } it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() sp := &NewSpider{ WarnInfoMap: map[int]*WarnInfo{}, //Error: map[string]*ErrorInfo{}, ErrType: "-1", ErrTypeMap: map[int]bool{}, Page_FlipOk: true, Page_OneOk: true, Page_TwoOk: true, } param_common := tmp["param_common"].([]interface{}) maxPage := qu.IntAll(param_common[5]) delete(tmp, "param_common") luaByte, _ := bson.Marshal(tmp) if bson.Unmarshal(luaByte, &sp) != nil { qu.Info("初始化爬虫失败:", tmp["_id"]) return } sp.Working = util.CodeEventWorking[sp.Working] sp.Model = util.CodeEventModel[sp.Event] sp.MaxPage = maxPage if sp.Platform == "python" { sp.Model = 1 } lock.Lock() NewCodeInfoMap[sp.Code] = sp lock.Unlock() }(tmp) if n%1000 == 0 { logger.Info(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Info("爬虫基本信息准备完成...", len(NewCodeInfoMap)) } func getCodeStatus() { defer qu.Catch() sess := util.MgoEB.GetMgoConn() defer util.MgoEB.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) fields := map[string]interface{}{ "spidercode": 1, "status_code": 1, } it := sess.DB(util.MgoPy.DbName).C("site_monitor").Find(nil).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) status := qu.IntAll(tmp["status_code"]) lock.Lock() if sp := NewCodeInfoMap[code]; sp != nil { sp.Channel_Status = status } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Info(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Info("栏目响应状态信息准备完成...", len(NewCodeInfoMap)) } func getPythonSummaryInfo() { defer qu.Catch() sess := util.MgoPy.GetMgoConn() defer util.MgoPy.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": util.GetTime(0), }, } it := sess.DB(util.MgoPy.DbName).C("spider_monitor").Find(&query).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["code"]) is_valid, _ := tmp["is_valid"].(bool) //无效监控爬虫 py_taskid := qu.ObjToString(tmp["py_taskid"]) py_nodename := qu.ObjToString(tmp["py_nodename"]) list_isgetdata, _ := tmp["list_isgetdata"].(bool) list_allintimes := qu.IntAll(tmp["list_allintimes"]) list_nodatatimes := qu.IntAll(tmp["list_nodatatimes"]) list_runtimes := qu.IntAll(tmp["list_runtimes"]) detail_downloadnum := qu.IntAll(tmp["detail_downloadnum"]) detail_downloadsuccessnum := qu.IntAll(tmp["detail_downloadsuccessnum"]) detail_downloadfailnum := qu.IntAll(tmp["detail_downloadfailnum"]) lock.Lock() if sp := NewCodeInfoMap[code]; sp != nil { sp.Py_TaskId = py_taskid sp.Py_NodeName = py_nodename sp.Py_IsValid = is_valid sp.List_IsGetData = list_isgetdata sp.List_AllInTimes = list_allintimes sp.List_NoDataTimes = list_nodatatimes sp.List_RunTimes = list_runtimes sp.Detail_DownloadNum = detail_downloadnum sp.Detail_DownloadSuccessNum = detail_downloadsuccessnum sp.Detail_DownloadFailNum = detail_downloadfailnum } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Info(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Info("python汇总信息完成...") } func getLuaSummaryInfo() { getSpiderHeart() //获取心跳信息 getSpiderHighListDownloadNum() //获取分开采集模式爬虫下载量信息 getSpiderListDownloadNum() //获取顺序采集模式爬虫下载量信息 getSpiderDownloadRateData() //获取下载详情 } func getSpiderWarnInfo() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } fields := map[string]interface{}{ "field": 1, "level": 1, "info": 1, "code": 1, "infotype": 1, "href": 1, "data.publishtime": 1, "data.l_np_publishtime": 1, } it := sess.DB(util.MgoS.DbName).C("spider_warn").Find(&query).Select(&fields).Iter() n := 0 ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() infotype := qu.IntAll(tmp["infotype"]) level := qu.IntAll(tmp["level"]) field := qu.ObjToString(tmp["field"]) if infotype == 3 || infotype == 7 { return } if (infotype == 5 || infotype == 6) && level == 1 { return } else if infotype == 8 && field == "projectinfo" { return } if infotype == 2 || infotype == 6 || infotype == 8 { if data, ok := tmp["data"].(map[string]interface{}); ok { var ptime int64 if l_np_publishtime := data["l_np_publishtime"]; l_np_publishtime != nil { ptime = qu.Int64All(l_np_publishtime) } else if publishtime := data["publishtime"]; publishtime != nil { ptime = qu.Int64All(publishtime) } if ptime < time.Now().AddDate(0, -6, 0).Unix() { //半年内的异常数据有效 return } } } code := qu.ObjToString(tmp["code"]) info := qu.ObjToString(tmp["info"]) href := qu.ObjToString(tmp["href"]) lock.Lock() if sp := NewCodeInfoMap[code]; sp != nil { if wf := sp.WarnInfoMap[infotype]; wf != nil { if wf.Fields[field] == 0 { wf.Hrefs[field] = href } wf.Fields[field] += 1 } else { sp.WarnInfoMap[infotype] = &WarnInfo{ Info: info, Num: 1, Fields: map[string]int{field: 1}, Hrefs: map[string]string{field: href}, } } } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Info(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Info("错误信息数据统计完成...") } func getSpiderHeart() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) query := map[string]interface{}{ "del": false, } fields := map[string]interface{}{ "code": 1, "findlist": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) it := sess.DB(util.MgoS.DbName).C("spider_heart").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["code"]) findListHeart := qu.Int64All(tmp["findlist"]) lock.Lock() if sp := NewCodeInfoMap[code]; sp != nil { //limitDayNum := 0 //if sp.Event == 7520 { //由于7520节点爬虫循环一轮的时间较长,心跳有可能仍是前一天的 // limitDayNum = -1 //} sp.List_IsGetData = findListHeart > util.GetTime(0)-int64(12*3600) //前一天12点 sp.HeartTime = findListHeart } lock.Unlock() }(tmp) if n%100 == 0 { logger.Info(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Info("lua统计心跳信息完成...") } func getSpiderHighListDownloadNum() { //竞品数据暂未统计(延迟采集) defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } fields := map[string]interface{}{ "spidercode": 1, "state": 1, "times": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) //1、统计spider_highlistdata it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) state := qu.IntAll(tmp["state"]) times := tmp["times"] lock.Lock() if sp := NewCodeInfoMap[code]; sp != nil { if state == 1 { sp.Detail_DownloadSuccessNum++ } else if state == -1 { sp.Detail_DownloadFailNum++ } else if state == 0 && times != nil { sp.Detail_DownloadFailNum++ } //未统计未下载的数据量 state==0,times==nil sp.Detail_DownloadNum++ } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Info(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Info("lua统计采集量spider_highlistdata完成...") } func getSpiderListDownloadNum() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } fields := map[string]interface{}{ "spidercode": 1, "state": 1, "href": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) repeatHrefMap := map[string]int{} it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fields).Sort("_id").Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() state := qu.IntAll(tmp["state"]) code := qu.ObjToString(tmp["spidercode"]) href := qu.ObjToString(tmp["href"]) lock.Lock() defer lock.Unlock() if sp := NewCodeInfoMap[code]; sp != nil { tmpState := repeatHrefMap[href] if tmpState == 1 { //该href已记录下载成功,后续不做任务记录 return } else if tmpState == 0 { //未曾记录该href //if sp := NewCodeInfoMap[code]; sp != nil { if state == 1 { sp.Detail_DownloadSuccessNum++ } else { state = -1 sp.Detail_DownloadFailNum++ } sp.Detail_DownloadNum++ repeatHrefMap[href] = state //} } else if tmpState == -1 && state == 1 { //已记录状态是下载失败,当前下载成功,记录该href最终为下载成功 //if sp := NewCodeInfoMap[code]; sp != nil { sp.Detail_DownloadSuccessNum++ sp.Detail_DownloadFailNum-- repeatHrefMap[href] = state //} } } }(tmp) if n%1000 == 0 { logger.Info(n) } tmp = map[string]interface{}{} } wg.Wait() repeatHrefMap = map[string]int{} logger.Info("lua统计spider_listdata采集量完成...") } func getSpiderDownloadRateData() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout) query := map[string]interface{}{ "date": date, //"event": map[string]interface{}{ // "$ne": 7000, //}, } fields := map[string]interface{}{ "spidercode": 1, "alltimes": 1, "zero": 1, "oh_percent": 1, "uplimit": 1, "page_fail": 1, "page_onefail": 1, } it := sess.DB(util.MgoS.DbName).C("spider_downloadrate").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) alltimes := qu.IntAll(tmp["alltimes"]) zero := qu.IntAll(tmp["zero"]) oh_percent := qu.IntAll(tmp["oh_percent"]) uplimit := qu.IntAll(tmp["uplimit"]) page_fail := qu.IntAll(tmp["page_fail"]) page_onefail := qu.IntAll(tmp["page_onefail"]) lock.Lock() if sp := NewCodeInfoMap[code]; sp != nil { sp.List_NoDataTimes = zero sp.List_RunTimes = alltimes sp.List_AllInTimes = oh_percent sp.Page_FlipOk = !(uplimit > 0) sp.UpLimit = uplimit sp.Page_OneOk = !(page_onefail == alltimes && page_onefail > 0) sp.Page_TwoOk = !(page_fail == alltimes && page_fail > 0) } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Info(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Info("lua爬虫采集详情统计完成...") } func saveCodeInfo() { defer qu.Catch() lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) comeintime := time.Now().Unix() codeInfoArr := []map[string]interface{}{} //爬虫下载详情 taskArr := [][]map[string]interface{}{} //任务更新集 for _, spider := range NewCodeInfoMap { ch <- true wg.Add(1) go func(sp *NewSpider) { defer func() { <-ch wg.Done() }() getAllErr(sp) //汇总异常 createTask(sp, &taskArr, lock) //创建任务 sp.Comeintime = comeintime spByte, err := bson.Marshal(sp) if err != nil { logger.Info("Json Marshal Error", sp.Code) return } lock.Lock() defer lock.Unlock() tmp := map[string]interface{}{} if bson.Unmarshal(spByte, &tmp) == nil { codeInfoArr = append(codeInfoArr, tmp) } else { logger.Error("Json UnMarshal Error", sp.Code) return } if len(codeInfoArr) > 500 { util.MgoS.SaveBulk("spider_info", codeInfoArr...) codeInfoArr = []map[string]interface{}{} } if len(taskArr) > 500 { util.MgoEB.UpSertBulk("task", taskArr...) taskArr = [][]map[string]interface{}{} } }(spider) } wg.Wait() if len(codeInfoArr) > 0 { util.MgoS.SaveBulk("spider_info", codeInfoArr...) codeInfoArr = []map[string]interface{}{} } if len(taskArr) > 0 { util.MgoEB.UpSertBulk("task", taskArr...) taskArr = [][]map[string]interface{}{} } NewCodeInfoMap = map[string]*NewSpider{} logger.Info("爬虫统计完成...") } func createTask(sp *NewSpider, taskArr *[][]map[string]interface{}, lock *sync.Mutex) { defer qu.Catch() if sp.Event == 7000 { return } if sp.ErrType == "-1" { //无异常 return } if !util.CreateTaskInfoFormat[sp.InfoFormat] { //非创建任务爬虫 return } //查询历史任务 query := map[string]interface{}{ "s_code": sp.Code, "i_state": map[string]interface{}{ "$in": []int{0, 1, 2, 3, 5}, //查询现有正在维护的任务 }, } fields := map[string]interface{}{ "i_state": 1, "s_type": 1, "s_descript": 1, "i_times": 1, "l_comeintime": 1, } list, _ := util.MgoEB.Find("task", query, nil, fields, false, -1, -1) update := []map[string]interface{}{} if list != nil && len(*list) > 0 { //已有任务 if len(*list) > 1 { logger.Error("Code:", sp.Code, "任务异常") util.MgoEB.Save("luacreatetaskerr", map[string]interface{}{ "code": sp.Code, "comeintime": time.Now().Unix(), "tasknum": len(*list), }) return } task := (*list)[0] //唯一任务 state_old := qu.IntAll(task["i_state"]) //历史任务状态 times_old := qu.IntAll(task["i_times"]) //历史任务次数 type_old := qu.ObjToString(task["s_type"]) //历史任务异常类型 descript_old := qu.ObjToString(task["s_descript"]) //历史任务描述 comeintime_old := qu.Int64All(task["l_comeintime"]) //历史任务创建时间 result := map[string]interface{}{ "i_event": sp.Event, "l_updatetime": time.Now().Unix(), "i_times": times_old + 1, "s_descript": descript_old + time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + sp.ErrDescription, } //任务状态 if state_old == 0 { if sp.ErrType == NEWTASK_LISTERR || sp.ErrType == NEWTASK_DATAINFOERR { result["i_state"] = 1 } else if comeintime_old >= util.GetTime(-30) { //在一个月内有历史任务 result["i_state"] = 1 } else { result["l_complete"] = util.CompleteTime("1") result["l_comeintime"] = time.Now().Unix() result["l_updatetime"] = time.Now().Unix() } } //任务类型 if sp.ErrType < type_old { //取优先级高者 result["s_type"] = sp.ErrType } update = append(update, map[string]interface{}{"_id": task["_id"]}) update = append(update, map[string]interface{}{"$set": result}) lock.Lock() *taskArr = append(*taskArr, update) lock.Unlock() } else { //无历史任务 state_new := 0 //if sp.ErrType == 1 && sp.Channel_Status != 200 { //列表页异常任务,栏目响应状态异常者,直接建待处理任务 // state_new = 1 //} if sp.ErrType == NEWTASK_LISTERR || sp.ErrType == NEWTASK_DATAINFOERR { state_new = 1 } saveMap := map[string]interface{}{ "s_modify": sp.ModifyUser, "s_modifyid": sp.ModifyId, "s_code": sp.Code, "s_site": sp.Site, "s_channel": sp.Channel, "i_event": sp.Event, "i_state": state_new, "s_source": "程序", "s_type": sp.ErrType, "s_descript": sp.ErrDescription, "i_times": 1, "l_comeintime": time.Now().Unix(), "l_complete": util.CompleteTime("1"), //"s_urgency": "1", "s_platform": sp.Platform, } update = append(update, query) update = append(update, saveMap) lock.Lock() *taskArr = append(*taskArr, update) lock.Unlock() } } func getAllErr(sp *NewSpider) { listErr(sp) //列表页异常 dataInfoErr(sp) //数据异常错误 pageFlipErr(sp) //爬虫翻页异常 downloadRateErr(sp) //下载频率异常 downloadFailedErr(sp) //下载异常 dataInfoWarn(sp) //数据异常警告 } func listErr(sp *NewSpider) { defer qu.Catch() if sp.Platform == "python" && !sp.Py_IsValid { return } //if !sp.List_IsGetData || sp.List_RunTimes == 0 { if !sp.List_IsGetData { errFlag := false if sp.CodeTags != nil { tagTime, _ := sp.CodeTags[NEWTASK_LISTERR].(int64) //用struct接收,会转为floa64 if tagTime == 0 { //无列表异常标记 errFlag = true } else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效 errFlag = true } } else { //无任何标记 errFlag = true } if errFlag { //sp.Error[NEWTASK_LISTERR] = &ErrorInfo{ // ErrInfo: map[string]bool{LuaErrTypeInfo[NEWTASK_LISTERR]: true}, //} sp.ErrType = NEWTASK_LISTERR sp.ErrTypeMap[qu.IntAll(NEWTASK_LISTERR)] = true heartTime := "" if sp.HeartTime != 0 { heartTime = qu.FormatDateByInt64(&sp.HeartTime, qu.Date_Full_Layout) } sp.ErrDescription += "列表页异常:\n 无最新心跳:" + heartTime + "\n" //if !sp.List_IsGetData { // heartTime := "" // if sp.HeartTime != 0 { // heartTime = qu.FormatDateByInt64(&sp.HeartTime, qu.Date_Full_Layout) // } // sp.ErrDescription += "列表页异常:\n 无最新心跳:" + heartTime + "\n" //} else if sp.List_RunTimes == 0 { // sp.ErrDescription += "列表页异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_NoDataTimes) + "轮无数据\n" //} } } } func dataInfoErr(sp *NewSpider) { defer qu.Catch() if len(sp.WarnInfoMap) > 0 { errFlag := false resultDescription := "" for err, _ := range DataInfoErrMap { if wf := sp.WarnInfoMap[err]; wf != nil { tmpDescription := "" for field, href := range wf.Hrefs { tmpDescription += " 字段" + field + ":" + href + "\n" } if tmpDescription != "" { resultDescription += " " + wf.Info + "\n" + tmpDescription } errFlag = true } } if errFlag { //sp.Error[NEWTASK_DATAINFOERR] = &ErrorInfo{ // ErrInfo: map[string]bool{LuaErrTypeInfo[NEWTASK_DATAINFOERR]: true}, //} sp.ErrDescription += "数据异常错误:\n" + resultDescription sp.ErrTypeMap[qu.IntAll(NEWTASK_DATAINFOERR)] = true if sp.ErrType < "0" { sp.ErrType = NEWTASK_DATAINFOERR } } } } func pageFlipErr(sp *NewSpider) { defer qu.Catch() if sp.Platform == "python" { return } errFlag := false if sp.CodeTags != nil { tagTime, _ := sp.CodeTags[NEWTASK_PAGEFLIPERR].(int64) if tagTime == 0 { //无翻页异常标记 errFlag = true } else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效 errFlag = true } } else { //无标记,记录翻页异常 errFlag = true } if errFlag { //1、无限翻页爬虫列表页采集时超过最大限制页,高性能100页,队列50页 if !sp.Page_FlipOk && sp.Model == 1 { sp.ErrTypeMap[qu.IntAll(NEWTASK_PAGEFLIPERR)] = true sp.ErrDescription += "爬虫翻页异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.UpLimit) + "轮列表页采集翻页超过最大限制\n" if sp.ErrType < "0" { sp.ErrType = NEWTASK_PAGEFLIPERR } } //2、爬虫列表页采集第一页无数据,第二页有数据 if !sp.Page_OneOk { sp.ErrTypeMap[qu.IntAll(NEWTASK_PAGEFLIPERR)] = true sp.ErrDescription += "爬虫翻页异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_RunTimes) + "轮爬虫未采集到第一页数据\n" if sp.ErrType < "0" { sp.ErrType = NEWTASK_PAGEFLIPERR } } //3、爬虫列表页采集第一页有数据,第二页无数据或第二页数据与第一页数据相同 if !sp.Page_TwoOk { sp.ErrTypeMap[qu.IntAll(NEWTASK_PAGEFLIPERR)] = true sp.ErrDescription += "爬虫翻页异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_RunTimes) + "轮爬虫采集的第一、二页数据相同或未采集到第二页数据\n" if sp.ErrType < "0" { sp.ErrType = NEWTASK_PAGEFLIPERR } } } } func downloadRateErr(sp *NewSpider) { defer qu.Catch() if sp.Platform == "python" { if !sp.Py_IsValid { //无效爬虫 return } else { errFlag := false if sp.CodeTags != nil { tagTime, _ := sp.CodeTags[NEWTASK_RATEERR].(int64) if tagTime == 0 { //无频率异常标记 errFlag = true } else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效 errFlag = true } } else { //无标记,记录采集频率异常 errFlag = true } if errFlag && sp.List_AllInTimes > 0 && sp.AuditTime > 24 { sp.ErrTypeMap[qu.IntAll(NEWTASK_RATEERR)] = true sp.ErrDescription += "采集频率异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_AllInTimes) + "轮数据全采\n" if sp.ErrType < "0" { sp.ErrType = NEWTASK_RATEERR } } } } else { //lua if sp.List_AllInTimes > 0 { errFlag := false if sp.Model == 1 && sp.AuditTime > 24 && (sp.MaxPage == 1 || sp.MaxPage > 100) { //分开采集,且爬虫审核时间超过24小时,记录异常 errFlag = true } else if sp.Event != 7410 { //顺序采集(7410节点不建采集频率异常任务) if sp.CodeTags != nil { tagTime, _ := sp.CodeTags[NEWTASK_RATEERR].(int64) if tagTime == 0 { //无频率异常标记 errFlag = true } else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效 errFlag = true } } else { //无标记,记录采集频率异常 errFlag = true } } if errFlag { //sp.Error[NEWTASK_RATEERR] = &ErrorInfo{ // ErrInfo: map[string]bool{LuaErrTypeInfo[NEWTASK_RATEERR]: true}, //} sp.ErrTypeMap[qu.IntAll(NEWTASK_RATEERR)] = true sp.ErrDescription += "采集频率异常:\n 列表页共采集" + fmt.Sprint(sp.List_RunTimes) + "轮,其中有" + fmt.Sprint(sp.List_AllInTimes) + "轮数据全采\n" if sp.ErrType < "0" { sp.ErrType = NEWTASK_RATEERR } } } } } func downloadFailedErr(sp *NewSpider) { defer qu.Catch() if sp.Platform == "python" && !sp.Py_IsValid { return } flagTime := util.GetTime(-7) if sp.Detail_DownloadFailNum > 0 { tagTime := int64(0) if sp.CodeTags != nil { tagTime, _ = sp.CodeTags[NEWTASK_DOWNLOADERR].(int64) //历史标记 if tagTime > flagTime { //标记未超期 if sp.Detail_DownloadFailNum == sp.Detail_DownloadNum { //全部下载失败,删除标记 delete(sp.CodeTags, NEWTASK_DOWNLOADERR) UpdateLuaconfig = append(UpdateLuaconfig, []map[string]interface{}{ {"code": sp.Code}, {"$set": map[string]interface{}{ "codetags": sp.CodeTags, }}, }) } else { return } } } if sp.Model == 1 { //分开采集 errFlag := false if sp.Detail_DownloadNum < 100 { //下载总量小于100 if sp.Detail_DownloadFailNum >= 3 { //失败个数超过3个 errFlag = true //异常 } else if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.2 { //失败占比超过20% errFlag = true //异常 } } else if sp.Detail_DownloadFailNum >= 3 { //下载总量大于100,失败个数超过3个 if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.03 { //失败占比超过3% errFlag = true //异常 } else { if sp.Detail_DownloadFailNum >= 30 { //失败个数超过30个 errFlag = true //异常 } else { if qu.FormatDateByInt64(&tagTime, qu.Date_Short_Layout) == qu.FormatDateByInt64(&flagTime, qu.Date_Short_Layout) { errFlag = true } else { //系统打标记 UpdateLuaconfig = append(UpdateLuaconfig, []map[string]interface{}{ {"code": sp.Code}, {"$set": map[string]interface{}{ "codetags." + NEWTASK_DOWNLOADERR: time.Now().Unix(), }}, }) } } } } if errFlag { q := map[string]interface{}{ "$or": []interface{}{ map[string]interface{}{ //state=-1下载失败 "spidercode": sp.Code, "state": -1, "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, }, map[string]interface{}{ //state=0,times存在,前一天未下载成功的 "spidercode": sp.Code, "state": 0, "times": map[string]interface{}{ "$exists": true, }, "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, }, }, } sp.getErrHrefs("spider_highlistdata", NEWTASK_DOWNLOADERR, q) sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true if sp.ErrType < "0" { sp.ErrType = NEWTASK_DOWNLOADERR } } } else { //顺序采集 //查询有无第一次记录(count=0),且下载失败的数据(count>0的数据表示该数据已经在采集当天统计过,不再二次统计) q := map[string]interface{}{ "spidercode": sp.Code, "count": 0, "state": -1, "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } count := sp.getErrHrefs("spider_listdata", NEWTASK_DOWNLOADERR, q) if count > 0 { sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true if sp.ErrType < "0" { sp.ErrType = NEWTASK_DOWNLOADERR } } } } } func dataInfoWarn(sp *NewSpider) { defer qu.Catch() if len(sp.WarnInfoMap) > 0 { tagTime := int64(-1) if sp.CodeTags != nil { tagTime, _ = sp.CodeTags[NEWTASK_DATAINFOWARN].(int64) } else { //无标记,记录列表页异常 tagTime = 0 } if tagTime > -1 { //标记时间超时或无标记 errFlag := false resultDescription := "" for err, _ := range DataInfoWarnMap { if wf := sp.WarnInfoMap[err]; wf != nil { tmpDescription := "" for field, href := range wf.Hrefs { tmpDescription += " 字段" + field + ":" + href + "\n" } if tmpDescription != "" { resultDescription += " " + wf.Info + "\n" + tmpDescription } errFlag = true } } if errFlag { //sp.Error[NEWTASK_DATAINFOWARN] = &ErrorInfo{ // ErrInfo: map[string]bool{LuaErrTypeInfo[NEWTASK_DATAINFOWARN]: true}, //} sp.ErrDescription += "数据异常警告:\n" + resultDescription sp.ErrTypeMap[qu.IntAll(NEWTASK_DATAINFOWARN)] = true if sp.ErrType < "0" { sp.ErrType = NEWTASK_DATAINFOWARN } } } } } func (sp *NewSpider) getErrHrefs(coll, errType string, query map[string]interface{}) (count int) { defer qu.Catch() if coll == "spider_listdata" { // count = util.MgoS.Count(coll, query) } else { count = sp.Detail_DownloadFailNum } if count == 0 { return } sp.ErrDescription += LuaErrTypeInfo[NEWTASK_DOWNLOADERR] + ":共下载" + fmt.Sprint(sp.Detail_DownloadNum) + "条,失败" + fmt.Sprint(sp.Detail_DownloadFailNum) + "条\n" if sp.Platform != "golua平台" || sp.Platform != "chrome" { return } list, _ := util.MgoS.Find(coll, query, nil, `{"href":1}`, false, 0, 3) if len(*list) > 0 { //errHrefs := []*ErrRemark{} for _, l := range *list { href := qu.ObjToString(l["href"]) //errHrefs = append(errHrefs, &ErrRemark{Href: href}) sp.ErrDescription += " " + href + "\n" } //sp.Error[errType] = &ErrorInfo{ // Num: sp.Detail_DownloadFailNum, // Err: errHrefs, // ErrInfo: map[string]bool{LuaErrTypeInfo[errType]: true}, //} } return } //更新爬虫 func updateLuaconfig() { if len(UpdateLuaconfig) > 0 { util.MgoEB.UpdateBulk("luaconfig", UpdateLuaconfig...) UpdateLuaconfig = [][]map[string]interface{}{} } } //关闭任务 func closeTask() { defer qu.Catch() query := map[string]interface{}{ //关闭7天未转为待处理的下载异常,数据异常警告类型的任务 "l_comeintime": map[string]interface{}{ "$lte": util.GetTime(-7), }, "i_state": 0, "s_type": map[string]interface{}{ "$in": []string{"5", "6"}, }, } set := map[string]interface{}{ "$set": map[string]interface{}{ "l_closetime": time.Now().Unix(), }, } util.MgoEB.Update("task", query, set, false, true) } /* 1、列表页统计的是当天心跳,提前告警。如果当天心跳有问题呢? 2、下载异常由于原网站详情页无信息造成的,如何提高任务准确率? 3、7410变链接造成的采集频率异常如何解决? */ //func downloadFailedErr(sp *NewSpider) { // defer qu.Catch() // if sp.Platform == "python" && !sp.Py_IsValid { // return // } // if sp.Detail_DownloadFailNum > 0 { // tagTime := int64(-1) // if sp.CodeTags != nil { // tagTime, _ = sp.CodeTags[NEWTASK_DOWNLOADERR].(int64) // } else { //无标记,记录列表页异常 // tagTime = 0 // } // if tagTime > -1 { // if sp.Model == 1 { //分开采集(python爬虫默认分开采集模式) // errFlag := false // if sp.Detail_DownloadNum < 100 { //下载总量小于100 // if sp.Detail_DownloadFailNum >= 3 { //失败个数超过3个 // errFlag = true //异常 // } else if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.2 { //失败占比超过20% // errFlag = true //异常 // } // } else if sp.Detail_DownloadFailNum >= 3 { //下载总量大于100,失败个数超过3个 // if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.03 { //失败占比超过3% // errFlag = true //异常 // } else { // if sp.Detail_DownloadFailNum >= 30 { //失败个数超过30个 // errFlag = true //异常 // } else { // tagFlag := tagTime == util.GetTime(-7) //上次标记时间是否是7天前当天 // if tagTime == 0 || !tagFlag { //系统打标记 // //系统打标记 // } else if tagFlag { // errFlag = true //异常 // } // } // } // } // if errFlag { // q := map[string]interface{}{ // "$or": []interface{}{ // map[string]interface{}{ //state=-1下载失败 // "spidercode": sp.Code, // "state": -1, // "comeintime": map[string]interface{}{ // "$gte": StartTime, // "$lt": EndTime, // }, // }, // map[string]interface{}{ //state=0,times存在,前一天未下载成功的 // "spidercode": sp.Code, // "state": 0, // "times": map[string]interface{}{ // "$exists": true, // }, // "comeintime": map[string]interface{}{ // "$gte": StartTime, // "$lt": EndTime, // }, // }, // }, // } // sp.getErrHrefs("spider_highlistdata", NEWTASK_DOWNLOADERR, q) // sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true // if sp.ErrType < 0 { // sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR) // } // } // } else { //顺序采集 // //查询有无第一次记录(count=0),且下载失败的数据(count>0的数据表示该数据已经在采集当天统计过,不再二次统计) // q := map[string]interface{}{ // "spidercode": sp.Code, // "count": 0, // "state": -1, // "comeintime": map[string]interface{}{ // "$gte": StartTime, // "$lt": EndTime, // }, // } // count := sp.getErrHrefs("spider_listdata", NEWTASK_DOWNLOADERR, q) // if count > 0 { // sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true // if sp.ErrType < 0 { // sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR) // } // } // } // } // } //}