package luatask import ( "encoding/json" "fmt" qu "qfw/util" "strings" "sync" "sync/atomic" "time" "util" "github.com/donnie4w/go-logger/logger" ) //采集频率异常、列表页异常、404异常、下载异常、运行异常、时间异常、数据异常 const TASK_RATEERR, TASK_LISTERR, TASK_404ERR, TASK_DOWNLOADERR, TASK_RUNERR, TASK_TIMEERR, TASK_DATAERR = 8, 7, 6, 5, 4, 3, 2 //失败占比 const FailedPercentLimit = 0.20 //失败条数 const FailedNumLimit = 3 var CodeInfoMap map[string]*Spider var AllHref map[string]string var SameDayHref map[string]string var DataBakAllHref map[string]string var StateFeedBackErr = map[int]string{ 0: "timeout", 200: "analysis", 404: "download", 500: "server", } var PythonErrTypeInfoMap = map[string]ErrTypeInfo{ "download": ErrTypeInfo{ ErrType: TASK_404ERR, Remark: "下载异常", }, "server": ErrTypeInfo{ ErrType: TASK_DOWNLOADERR, Remark: "服务异常", }, "analysis": ErrTypeInfo{ ErrType: TASK_RUNERR, Remark: "解析异常", }, "timeout": ErrTypeInfo{ ErrType: TASK_TIMEERR, Remark: "超时异常", }, } var LuaErrTypeInfoMap = map[string]ErrTypeInfo{ "download": ErrTypeInfo{ ErrType: TASK_DOWNLOADERR, Remark: "下载异常", }, "regather": ErrTypeInfo{ ErrType: TASK_RUNERR, Remark: "运行异常", }, "publishtime": ErrTypeInfo{ ErrType: TASK_TIMEERR, Remark: "时间异常", }, "text": ErrTypeInfo{ ErrType: TASK_DATAERR, Remark: "数据异常", }, } //spider type Spider struct { Site string `json:"site"` //站点 Platform string `json:"platform"` //平台 Code string `json:"spidercode"` //爬虫 Channel string `json:"channel"` //栏目 AuditTime int64 `json:"audittime"` //最新审核时间 ModifyUser string `json:"modifyuser"` //维护人 ModifyId string `json:"modifyid"` //维护人id Event int `json:"event"` //节点 State int `json:"state"` //状态 PendState int `json:"pendstate"` //挂起状态 Weight int `json:"weight"` //爬虫权重 FrequencyErrTimes int `json:"frequencyerrtimes"` //爬虫采集频率异常次数 MaxPage int `json:"maxpage"` //采集最大页 Model int `json:"model"` //采集模式(新\老) 0:老模式;1:新模式 Working int `json:"working"` //采集模式(高低\性能)0:高性能模式;1:队列模式 ListIsFilter bool `json:"listisfilter"` //lua列表页采集是否包含过滤 //基于comeintime不去重的下载量 DownloadAllNum int `json:"downloadallnum"` //总下载量 DownloadSuccessNum int `json:"downloadsuccessnum"` //下载成功量 DownloadFailedNum int `json:"downloadfailednum"` //下载失败量 NoDownloadNum int `json:"nodownloadnum"` //未下载量 //基于comeintime不去重的当天下载量 PTimeAllNum int `json:"ptimeallnum"` //当天总下载量 PTimeSuccessNum int `json:"ptimesuccessnum"` //当天下载成功量 PTimeFailedNum int `json:"ptimefailednum"` //当天下载失败量 PTimeNoDownloadNum int `json:"ptimenodownloadnum"` //当天未下载量 //基于comeintime去重的下载量 RepeatDownloadAllNum int `json:"repeatdownloadallnum"` //总下载量 RepeatDownloadSuccessNum int `json:"repeatdownloadsuccessnum"` //下载成功量 RepeatDownloadFailedNum int `json:"repeatdownloadfailednum"` //下载失败量 RepeatNoDownloadNum int `json:"repeatnodownloadnum"` //未下载量 //基于comeintime去重的当天下载量 RepeatPTimeAllNum int `json:"repeatptimeallnum"` //当天总下载量 RepeatPTimeSuccessNum int `json:"repeatptimesuccessnum"` //当天下载成功量 RepeatPTimeSuccessDataBakNum int `json:"repeatptimesuccessdbnum"` //data_bak当天发布数据量 RepeatPTimeFailedNum int `json:"repeatptimefailednum"` //当天下载失败量 RepeatPTimeNoDownloadNum int `json:"repeatptimenodownloadnum"` //当天未下载量 ListDownloadAllTimes int `json:"listdownloadalltimes"` //一天内列表页总下载次数 ListOhPercentTimes int `json:"listohpercenttimes"` //列表页采集百分百次数 ListNoDataTimes int `json:"listnodatatimes"` //一天内列表页下载无数据次数 Comeintime int64 `json:"comeintime"` //入库时间 ListHeart int64 `json:"listheart"` //列表页执行心跳 DetailHeart int64 `json:"detailheart"` //详情页执行心跳 FindListHeart int64 `json:"findlistheart"` //列表页获得数据量心跳 DetailExecuteHeart int64 `json:"detailexecuteheart"` //详情页下载成功心跳 Error map[string]*ErrorInfo `json:"error"` //OhPercentTimes int `json:"ohpercentimes"` //采集量占总下载量100%的次数 //NtPercentTime int `json:"ntpercentimes"` //采集量占总下载量90%-100%的次数 //EtPercentTime int `json:"etpercentimes"` //采集量占总下载量80%-90%的次数 } //spider:错误异常 type ErrorInfo struct { Num int //错误条数 Err []*ErrRemark //错误详情 } //spider type ErrRemark struct { Href string //链接 Remark string //异常说明 } //task type Task struct { Platform string //平台 Code string //爬虫代码 Site string //站点 Channel string //栏目 ModifyUser string //维护人员 ModifyId string //维护人员id ErrType int //异常类型:8:采集频率异常;7:列表页异常;5:下载异常;4:运行异常;3:发布时间异常;2:数据异常;1:数据量异常 Description string //描述 State int //状态 Event int //节点 Num int //下载量 FrequencyErrTimes int //爬虫采集频率异常次数 DescribeMap map[int]string //ErrInfo map[string]map[string]interface{} //异常集合 } //task:任务异常类型信息 type ErrTypeInfo struct { ErrType int //任务异常类型 Remark string //异常类型说明 } var ( StartTime int64 //上一个工作日的起始时间 EndTime int64 //上一个工作日的结束时间 Publishtime string //发布时间 TaskMap map[string]*Task //任务集合 UserTaskNum map[string]map[string]int //记录每人每天新建任务量 CodeLock = &sync.Mutex{} // ) func StartTask() { InitInfo() //初始化时间 logger.Debug(StartTime, EndTime, Publishtime) GetCodeBaseInfo() //初始化爬虫基本信息 GetBiddingCount() //统计bidding表爬虫采集量 GetCodeHeart() //初始化爬虫心跳信息 GetSpiderHighListDownloadNum() //统计spider_highlistdata爬虫列表页下载量、下载失败量、未下载量 GetSpiderListDownloadNum() //统计spider_listdata爬虫列表页下载量、下载失败量、未下载量 GetSpiderDataBakDownloadNum() //统计data_bak爬虫下载量 GetSpiderDownloadRateDataNew() //下载率 GetSpiderWarnErrData() //异常信息 GetPythonWarnErrData() //python相关 //SaveCodeInfo() CreateTaskProcess() // GetDownloadNumber() //统计下载量 //CloseTask() //关闭任务 SendInfoToWxWork_SiteDataCount() SendLuaPythonAllNum() } //初始化 func InitInfo() { defer qu.Catch() CodeInfoMap = map[string]*Spider{} //初始化 AllHref = map[string]string{} SameDayHref = map[string]string{} DataBakAllHref = map[string]string{} UserTaskNum = map[string]map[string]int{} StartTime, EndTime = util.GetWorkDayTimeUnix() Publishtime = qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout) //StartTime = util.GetTime(-1) //EndTime = util.GetTime(0) } // GetCodeBaseInfo 准备爬虫基本信息 func GetCodeBaseInfo() { defer qu.Catch() sess := util.MgoE.GetMgoConn() defer util.MgoE.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "$or": []interface{}{ //lua、python上线爬虫 map[string]interface{}{ "state": map[string]interface{}{ "$in": []int{5, 11}, //上架、上线爬虫 }, }, //lua正在被维护的爬虫和上架爬虫 map[string]interface{}{ "platform": "golua平台", "state": map[string]interface{}{ "$in": []int{0, 1, 2}, //待完成、待审核、未通过 }, "event": map[string]interface{}{ "$ne": 7000, }, }, //python正在被维护的爬虫和上线爬虫 map[string]interface{}{ "platform": "python", "state": map[string]interface{}{ "$in": []int{1, 2, 6}, //待审核、未通过、已下架 }, }, }, } fieles := map[string]interface{}{ "event": 1, "param_common": 1, "platform": 1, "modifyuser": 1, "modifyuserid": 1, "state": 1, "pendstate": 1, "weight": 1, "l_uploadtime": 1, "listisfilter": 1, "frequencyerrtimes": 1, "code": 1, } count := util.MgoE.Count("luaconfig", query) logger.Debug("共加载线上爬虫个数:", count) it := sess.DB(util.MgoE.DbName).C("luaconfig").Find(&query).Select(&fieles).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() sp := &Spider{ Error: map[string]*ErrorInfo{}, } if param_common, ok := tmp["param_common"].([]interface{}); ok && len(param_common) >= 6 { //sp.Code = qu.ObjToString(param_common[0]) sp.Site = qu.ObjToString(param_common[1]) sp.Channel = qu.ObjToString(param_common[2]) sp.MaxPage = qu.IntAll(param_common[5]) } else { logger.Debug("加载爬虫出错:", tmp["_id"]) } sp.Code = qu.ObjToString(tmp["code"]) sp.ModifyUser = qu.ObjToString(tmp["modifyuser"]) sp.ModifyId = qu.ObjToString(tmp["modifyuserid"]) sp.AuditTime = qu.Int64All(tmp["l_uploadtime"]) sp.Platform = qu.ObjToString(tmp["platform"]) sp.Event = qu.IntAll(tmp["event"]) sp.State = qu.IntAll(tmp["state"]) sp.PendState = qu.IntAll(tmp["pendstate"]) sp.Weight = qu.IntAll(tmp["weight"]) if sp.Platform == "python" { sp.ListIsFilter = false } else { sp.ListIsFilter = tmp["listisfilter"].(bool) } sp.FrequencyErrTimes = qu.IntAll(tmp["frequencyerrtimes"]) sp.Model = util.CodeEventModel[sp.Event] sp.Working = util.CodeEventWorking[sp.Event] sp.Comeintime = time.Now().Unix() lock.Lock() CodeInfoMap[sp.Code] = sp lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("爬虫基本信息准备完成...", len(CodeInfoMap)) } func GetBiddingCount() { defer qu.Catch() sess := util.MgoB.GetMgoConn() defer util.MgoB.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } fieles := map[string]interface{}{ "spidercode": 1, } count := util.MgoB.Count("bidding", query) logger.Debug("bidding采集数据量:", count) it := sess.DB(util.MgoB.DbName).C("bidding").Find(&query).Select(&fieles).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) lock.Lock() if sp := CodeInfoMap[code]; sp != nil { if sp.Platform == "golua平台" { LuaBiddingDownloadAllNum++ } else if sp.Platform == "python" { PythonBiddingDownloadAllNum++ } } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("Bidding数据量统计完成...", LuaBiddingDownloadAllNum, PythonBiddingDownloadAllNum) } // GetCodeHeart 获取爬虫的心跳信息 func GetCodeHeart() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) query := map[string]interface{}{ "del": false, } fields := map[string]interface{}{ "code": 1, "list": 1, "detail": 1, "findlist": 1, "detailexecute": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) it := sess.DB(util.MgoS.DbName).C("spider_heart").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["code"]) listHeart := qu.Int64All(tmp["list"]) detailHeart := qu.Int64All(tmp["detail"]) findListHeart := qu.Int64All(tmp["findlist"]) detailExecuteHeart := qu.Int64All(tmp["detailexecute"]) lock.Lock() if sp := CodeInfoMap[code]; sp != nil { sp.ListHeart = listHeart sp.DetailHeart = detailHeart sp.FindListHeart = findListHeart sp.DetailExecuteHeart = detailExecuteHeart } lock.Unlock() }(tmp) if n%100 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("统计采集量spider_heart完成...") } // GetSpiderHighListDownloadNum 统计爬虫列表页下载量和下载失败量 func GetSpiderHighListDownloadNum() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } fields := map[string]interface{}{ "spidercode": 1, "href": 1, "state": 1, "times": 1, "publishtime": 1, "site": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) //1、统计spider_highlistdata it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) href := qu.ObjToString(tmp["href"]) state := qu.IntAll(tmp["state"]) site := qu.ObjToString(tmp["site"]) ptime := qu.ObjToString(tmp["publishtime"]) sameDay := strings.Contains(ptime, Publishtime) //判断是否是当天的数据 lock.Lock() if sp := CodeInfoMap[code]; sp != nil { //href不去重统计 success := true sp.DownloadAllNum++ if sameDay { sp.PTimeAllNum++ } if state == 1 { //下载成功 sp.DownloadSuccessNum++ if sameDay { sp.PTimeSuccessNum++ } } else if state == -1 { //下载失败 success = false sp.DownloadFailedNum++ if sameDay { sp.PTimeFailedNum++ } } else { if tmp["times"] == nil { //未下载 sp.NoDownloadNum++ if sameDay { sp.PTimeNoDownloadNum++ } } else { //下载失败,状态被重置 success = false sp.DownloadFailedNum++ if sameDay { sp.PTimeFailedNum++ } } } //按当天发布时间href去重 if sameDay && SameDayHref[href] != site { sp.RepeatDownloadAllNum++ sp.RepeatPTimeAllNum++ if state == 1 { sp.RepeatDownloadSuccessNum++ sp.RepeatPTimeSuccessNum++ } else if state == -1 { //下载失败 sp.RepeatDownloadFailedNum++ sp.RepeatPTimeFailedNum++ } else { if tmp["times"] == nil { //未下载 sp.RepeatNoDownloadNum++ sp.RepeatPTimeNoDownloadNum++ } else { //下载失败,状态被重置 sp.RepeatPTimeFailedNum++ sp.RepeatDownloadFailedNum++ } } SameDayHref[href] = site AllHref[href] = site } else if AllHref[href] != site { //按全量href去重 sp.RepeatDownloadAllNum++ if state == 1 { //下载成功 sp.RepeatDownloadSuccessNum++ } else if state == -1 { //下载失败 sp.RepeatDownloadFailedNum++ } else { if tmp["times"] == nil { //未下载 sp.RepeatNoDownloadNum++ } else { //下载失败,状态被重置 sp.RepeatDownloadFailedNum++ } } AllHref[href] = site } //href站点内去重统计 //if AllHref[href] != site { // sp.RepeatDownloadAllNum++ // if sameDay { // sp.RepeatPTimeAllNum++ // } // if state == 1 { //下载成功 // sp.RepeatDownloadSuccessNum++ // if sameDay { // sp.RepeatPTimeSuccessNum++ // } // } else if state == -1 { //下载失败 // sp.RepeatDownloadFailedNum++ // if sameDay { // sp.RepeatPTimeFailedNum++ // } // } else { // if tmp["times"] == nil { //未下载 // sp.RepeatNoDownloadNum++ // if sameDay { // sp.RepeatPTimeNoDownloadNum++ // } // } else { //下载失败,状态被重置 // sp.RepeatDownloadFailedNum++ // if sameDay { // sp.RepeatPTimeFailedNum++ // } // } // } // AllHref[href] = site //} if !success { //下载失败记录href if errorInfo := sp.Error["download"]; errorInfo == nil { sp.Error["download"] = &ErrorInfo{ Num: sp.DownloadFailedNum, Err: []*ErrRemark{ &ErrRemark{ Href: href, Remark: "Download Failed", }, }, } } else { errorInfo.Num = sp.DownloadFailedNum if len(errorInfo.Err) < 3 { errorInfo.Err = append(errorInfo.Err, &ErrRemark{ Href: qu.ObjToString(tmp["href"]), Remark: "Download Failed", }) } } } } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("统计采集量spider_highlistdata完成...") } func GetSpiderListDownloadNum() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } fields := map[string]interface{}{ "spidercode": 1, "href": 1, "state": 1, "site": 1, "times": 1, "publishtime": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) href := qu.ObjToString(tmp["href"]) state := qu.IntAll(tmp["state"]) site := qu.ObjToString(tmp["site"]) ptime := qu.ObjToString(tmp["publishtime"]) sameDay := strings.Contains(ptime, Publishtime) //判断是否是当天的数据 lock.Lock() if sp := CodeInfoMap[code]; sp != nil { //href不去重统计 success := true sp.DownloadAllNum++ if sameDay { sp.PTimeAllNum++ } if state == 1 { //下载成功 sp.DownloadSuccessNum++ if sameDay { sp.PTimeSuccessNum++ } } else if state == -1 { //下载失败 success = false sp.DownloadFailedNum++ if sameDay { sp.PTimeFailedNum++ } } else { //未下载 sp.NoDownloadNum++ if sameDay { sp.PTimeNoDownloadNum++ } } //按当天发布时间href去重 if sameDay && SameDayHref[href] != site { sp.RepeatDownloadAllNum++ sp.RepeatPTimeAllNum++ if state == 1 { sp.RepeatDownloadSuccessNum++ sp.RepeatPTimeSuccessNum++ } else if state == -1 { //下载失败 sp.RepeatDownloadFailedNum++ sp.RepeatPTimeFailedNum++ } else { if tmp["times"] == nil { //未下载 sp.RepeatNoDownloadNum++ sp.RepeatPTimeNoDownloadNum++ } else { //下载失败,状态被重置 sp.RepeatPTimeFailedNum++ sp.RepeatDownloadFailedNum++ } } SameDayHref[href] = site AllHref[href] = site } else if AllHref[href] != site { //按全量href去重 sp.RepeatDownloadAllNum++ if state == 1 { //下载成功 sp.RepeatDownloadSuccessNum++ } else if state == -1 { //下载失败 sp.RepeatDownloadFailedNum++ } else { if tmp["times"] == nil { //未下载 sp.RepeatNoDownloadNum++ } else { //下载失败,状态被重置 sp.RepeatDownloadFailedNum++ } } AllHref[href] = site } //href站点内去重统计 //if AllHref[href] != site { // sp.RepeatDownloadAllNum++ // if samaDay { // sp.RepeatPTimeAllNum++ // } // if state == 1 { //下载成功 // sp.RepeatDownloadSuccessNum++ // if samaDay { // sp.RepeatPTimeSuccessNum++ // } // } else if state == -1 { //下载失败 // sp.RepeatDownloadFailedNum++ // if samaDay { // sp.RepeatPTimeFailedNum++ // } // } else { //未下载 // sp.RepeatNoDownloadNum++ // if samaDay { // sp.RepeatPTimeNoDownloadNum++ // } // } // AllHref[href] = site //} if !success { //下载失败记录href if errorInfo := sp.Error["download"]; errorInfo == nil { sp.Error["download"] = &ErrorInfo{ Num: sp.DownloadFailedNum, Err: []*ErrRemark{ &ErrRemark{ Href: href, Remark: "Download Failed", }, }, } } else { errorInfo.Num = sp.DownloadFailedNum if len(errorInfo.Err) < 3 { errorInfo.Err = append(errorInfo.Err, &ErrRemark{ Href: href, Remark: "Download Failed", }) } } } } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() AllHref = map[string]string{} SameDayHref = map[string]string{} logger.Debug("统计spider_listdata采集量完成...") } func GetSpiderDataBakDownloadNum() { defer qu.Catch() logger.Debug("统计采集量data_bak开始...") sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, "l_np_publishtime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } fields := map[string]interface{}{ "spidercode": 1, "href": 1, "site": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) it := sess.DB(util.MgoS.DbName).C("data_bak").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) href := qu.ObjToString(tmp["href"]) site := qu.ObjToString(tmp["site"]) lock.Lock() defer lock.Unlock() if sp := CodeInfoMap[code]; sp != nil { //单独统计data_bak每个爬虫当天发布的数据量 if DataBakAllHref[href] != site { sp.RepeatPTimeSuccessDataBakNum++ DataBakAllHref[href] = site } if sp.DownloadAllNum == 0 || sp.PTimeAllNum != 0 { return } sp.PTimeAllNum++ sp.RepeatPTimeAllNum++ sp.PTimeSuccessNum++ sp.RepeatPTimeSuccessNum++ } }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() DataBakAllHref = map[string]string{} //wg := &sync.WaitGroup{} //ch := make(chan bool, 5) //n := 0 //for _, sp := range CodeInfoMap { // n++ // if n%100 == 0 { // logger.Debug("current:", n) // } // if sp.Platform != "golua平台" || sp.DownloadAllNum == 0 || sp.PTimeAllNum != 0 { //根据发布时间统计无数据,统计data_bak // continue // } // //logger.Info("列表页未匹配到当天发布数据的爬虫:", sp.Code) // wg.Add(1) // ch <- true // go func(tmpSp *Spider) { // defer func() { // <-ch // wg.Done() // }() // query := map[string]interface{}{ // "comeintime": map[string]interface{}{ // "$gte": StartTime, // "$lt": EndTime, // }, // "l_np_publishtime": map[string]interface{}{ // "$gte": StartTime, // "$lt": EndTime, // }, // "spidercode": tmpSp.Code, // } // count := util.MgoS.Count("data_bak", query) // tmpSp.PTimeAllNum = count // tmpSp.RepeatPTimeAllNum = count // tmpSp.PTimeSuccessNum = count // tmpSp.RepeatPTimeSuccessNum = count // }(sp) //} //wg.Wait() logger.Debug("统计采集量data_bak完成...") } // GetSpiderListDownloadNum 统计爬虫列表页下载量和下载失败量 func GetSpiderListDownloadNum_Back() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) //2、统计spider_listdata match := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, "event": map[string]interface{}{ "$ne": 7000, }, } group1 := map[string]interface{}{ "_id": map[string]interface{}{ "spidercode": "$spidercode", "state": "$state", }, "datacount": map[string]interface{}{ "$sum": 1, }, } group2 := map[string]interface{}{ "_id": "$_id.spidercode", "stateinfo": map[string]interface{}{ "$push": map[string]interface{}{ "state": "$_id.state", "count": "$datacount", }, }, "count": map[string]interface{}{ "$sum": "$datacount", }, } project := map[string]interface{}{ "statearr": "$stateinfo", "count": 1, } p := []map[string]interface{}{ map[string]interface{}{"$match": match}, map[string]interface{}{"$group": group1}, map[string]interface{}{"$group": group2}, map[string]interface{}{"$project": project}, } it := sess.DB(util.MgoS.DbName).C("spider_listdata").Pipe(p).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["_id"]) count := qu.IntAll(tmp["count"]) //下载总量(不准确,含重复数据) successCount := 0 //下载成功量(不准确,含重复数据) failedCount := 0 //下载失败量(不准确,含重复数据) noCount := 0 //未下载量 if stateArr, ok := tmp["statearr"].([]interface{}); ok { for _, stateInfo := range stateArr { infoMap := stateInfo.(map[string]interface{}) state := qu.IntAll(infoMap["state"]) if state == 1 { //state:1,下载成功量 successCount = qu.IntAll(infoMap["count"]) } else if state == -1 { //state:-1,下载失败量 failedCount = qu.IntAll(infoMap["count"]) } else if state == 0 { //state:0,未下载量 noCount = qu.IntAll(infoMap["count"]) } } } errArr := []*ErrRemark{} if failedCount > 0 { //有采集失败的数据,查询失败链接 query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, "event": map[string]interface{}{ "$ne": 7000, }, "spidercode": code, "state": -1, } logger.Debug("采集失败数据query:", query) data, _ := util.MgoS.FindOne("spider_listdata", query) if data != nil { errArr = append(errArr, &ErrRemark{ Href: qu.ObjToString((*data)["href"]), Remark: "Download Failed", }) } } lock.Lock() if spider := CodeInfoMap[code]; spider != nil { spider.DownloadAllNum = count spider.DownloadSuccessNum = successCount spider.DownloadFailedNum = failedCount spider.NoDownloadNum = noCount if len(errArr) > 0 { spider.Error["download"] = &ErrorInfo{ Num: failedCount, Err: errArr, } } } else { logger.Debug("-------------", code) } lock.Unlock() }(tmp) if n%100 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("统计spider_listdata采集量完成...") } // GetSpiderDownloadRateDataNew 汇总列表页采集频率情况 func GetSpiderDownloadRateDataNew() { defer qu.Catch() sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout) query := map[string]interface{}{ "date": date, "event": map[string]interface{}{ "$ne": 7000, }, } fields := map[string]interface{}{ "spidercode": 1, "alltimes": 1, "zero": 1, "oh_percent": 1, } logger.Debug("query:", query) it := sess.DB(util.MgoS.DbName).C("spider_downloadrate").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) alltimes := qu.IntAll(tmp["alltimes"]) zero := qu.IntAll(tmp["zero"]) oh_percent := qu.IntAll(tmp["oh_percent"]) lock.Lock() if spider := CodeInfoMap[code]; spider != nil { spider.ListDownloadAllTimes = alltimes spider.ListNoDataTimes = zero //含有100%采集,及为采集频率异常(由于7410、7500、7510、7700队列模式节点,不建采集频率异常任务) //上轮数据下载不成功,下轮采集会被任务是新数据(应该建下载异常任务) if oh_percent > 0 && spider.Model != 0 { spider.FrequencyErrTimes++ spider.ListOhPercentTimes = oh_percent } } else { logger.Debug("-------------", code) } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug("current:", n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("列表页采集统计完成...") } //汇总lua错误信息数据 func GetSpiderWarnErrData() { defer qu.Catch() logger.Debug("错误信息数据统计...") sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) match := map[string]interface{}{ "level": 2, "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } group1 := map[string]interface{}{ "_id": map[string]interface{}{ "code": "$code", "info": "$info", }, "datacount": map[string]interface{}{ "$sum": 1, }, } group2 := map[string]interface{}{ "_id": "$_id.code", "infotext": map[string]interface{}{ "$push": map[string]interface{}{ "info": "$_id.info", "count": "$datacount", }, }, "count": map[string]interface{}{ "$sum": "$datacount", }, } project := map[string]interface{}{ "infoarr": "$infotext", "count": 1, } p := []map[string]interface{}{ map[string]interface{}{"$match": match}, map[string]interface{}{"$group": group1}, map[string]interface{}{"$group": group2}, map[string]interface{}{"$project": project}, } logger.Debug("spider_warn:", match) //1、统计spider_warn it1 := sess.DB(util.MgoS.DbName).C("spider_warn").Pipe(p).Iter() n1 := 0 ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} for tmp := make(map[string]interface{}); it1.Next(&tmp); n1++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["_id"]) //spider.Error = map[string]*ErrorInfo{} //初始化 if infoArr, ok := tmp["infoarr"].([]interface{}); ok { for _, info := range infoArr { stype := "" query := map[string]interface{}{ "level": 2, "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } infoMap := info.(map[string]interface{}) infoText := qu.ObjToString(infoMap["info"]) //错误信息 errCount := qu.IntAll(infoMap["count"]) //错误数量 if infoText == "Publishtime Is Too Late" { //发布时间超前 query["info"] = infoText stype = "publishtime" } else if infoText == "Publishtime Is Less Than Zero" { //发布时间小于0 query["info"] = infoText stype = "publishtime" } else if infoText == "Publishtime Is Too Early" { //发布时间过小 query["info"] = infoText stype = "publishtime" } else if infoText == "Field Value Not Contains Chinese" { //title、detail不含中文 query["info"] = infoText stype = "text" } else if infoText == "Field Value Contains Random Code" { //title、detail含乱码 query["info"] = infoText stype = "text" } else { continue } query["code"] = code //logger.Debug(query) //errArr := []*ErrRemark{} //list, _ := util.MgoS.Find("spider_warn", query, nil, map[string]interface{}{"href": 1}, false, 0, 3) //for _, l := range *list { // errArr = append(errArr, &ErrRemark{ // Href: qu.ObjToString(l["href"]), // Remark: infoText, // }) //} one, _ := util.MgoS.FindOne("spider_warn", query) //查询该错误信息类型的一条href oneErrInfo := &ErrRemark{ Href: qu.ObjToString((*one)["href"]), Remark: infoText, } lock.Lock() if spider := CodeInfoMap[code]; spider != nil { if errMap := spider.Error[stype]; errMap != nil { errMap.Num += errCount errMap.Err = append(errMap.Err, oneErrInfo) } else { spider.Error[stype] = &ErrorInfo{ Num: errCount, Err: []*ErrRemark{ oneErrInfo, }, } } } lock.Unlock() } } }(tmp) if n1%10 == 0 { logger.Debug(n1) } tmp = map[string]interface{}{} } //2、统计regatherdata //match = map[string]interface{}{ // "state": map[string]interface{}{ // "$lte": 1, // }, // "from": "lua", // "comeintime": map[string]interface{}{ // "$gte": StartTime, // "$lt": EndTime, // }, //} //group1 = map[string]interface{}{ // "_id": "$spidercode", // "count": map[string]interface{}{ // "$sum": 1, // }, //} //p = []map[string]interface{}{ // map[string]interface{}{"$match": match}, // map[string]interface{}{"$group": group1}, //} //logger.Debug("regather query:", match) //it2 := sess.DB(util.MgoS.DbName).C("regatherdata").Pipe(p).Iter() //n2 := 0 //for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ { // wg.Add(1) // ch <- true // go func(tmp map[string]interface{}) { // defer func() { // <-ch // wg.Done() // }() // code := qu.ObjToString(tmp["_id"]) //爬虫代码 // count := qu.IntAll(tmp["count"]) //异常数据量 // query := map[string]interface{}{ // "state": map[string]interface{}{ // "$lte": 1, // }, // "from": "lua", // "comeintime": map[string]interface{}{ // "$gte": StartTime, // "$lt": EndTime, // }, // "spidercode": code, // } // //logger.Debug("query:", query) // // errArr := []*ErrRemark{} // list, _ := util.MgoS.Find("regatherdata", query, nil, map[string]interface{}{"href": 1, "error": 1}, false, 0, 3) // for _, l := range *list { // errText := qu.ObjToString(l["error"]) // errText = strings.Replace(errText, ":", "", 1) // errArr = append(errArr, &ErrRemark{ // Href: qu.ObjToString(l["href"]), // Remark: errText, // }) // } // //one, _ := util.MgoS.FindOne("regatherdata", query) //查询该错误信息类型的一条href // //oneErrInfo := &ErrRemark{ // // Href: qu.ObjToString((*one)["href"]), // // Remark: qu.ObjToString((*one)["error"]), // //} // if spider := CodeInfoMap[code]; spider != nil { // spider.Error["regather"] = &ErrorInfo{ // Num: count, // Err: errArr, // } // // if spider_err := spider.Error; spider_err != nil { // // spider_err["regather"] = &ErrorInfo{ // // Num: count, // // Err: []map[string]interface{}{ // // oneErrInfo, // // }, // // } // // } else { // // spider.Error = map[string]*ErrorInfo{ // // "regather": &ErrorInfo{ // // Num: count, // // Err: []map[string]interface{}{ // // oneErrInfo, // // }, // // }, // // } // // } // } // }(tmp) // if n2%10 == 0 { // logger.Debug(n2) // } // tmp = map[string]interface{}{} //} wg.Wait() logger.Debug("错误信息数据统计完成...") } //汇总python错误信息数据 func GetPythonWarnErrData() { GetPythonListDownloadNum() //统计列表页采集量 GetPythonDetailDownloadNum() //统计data_bak总下载量 GetPythonErrData() //统计异常信息 } //python统计列表页采集量 func GetPythonListDownloadNum() { defer qu.Catch() logger.Debug("python列表页数据下载量统计开始...") sess := util.MgoPy.GetMgoConn() defer util.MgoPy.DestoryMongoConn(sess) query := map[string]interface{}{ "runtime": Publishtime, "rel_count": map[string]interface{}{ "$gt": 0, }, } fields := map[string]interface{}{ "spidercode": 1, "rel_count": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) it := sess.DB(util.MgoPy.DbName).C("list").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) count := qu.IntAll(tmp["rel_count"]) lock.Lock() if sp := CodeInfoMap[code]; sp != nil { //href不去重统计 sp.DownloadAllNum += count sp.RepeatDownloadAllNum += count } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("python数据下载量统计完成...") } //python三级页统计总下载量 func GetPythonDetailDownloadNum() { defer qu.Catch() logger.Debug("python三级页数据下载量统计开始...") sess := util.MgoPy.GetMgoConn() defer util.MgoPy.DestoryMongoConn(sess) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } fields := map[string]interface{}{ "spidercode": 1, "publishtime": 1, "sendflag": 1, } lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) it := sess.DB(util.MgoPy.DbName).C("data_bak").Find(&query).Select(&fields).Iter() n := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["spidercode"]) ptime := qu.ObjToString(tmp["publishtime"]) sendflag := qu.ObjToString(tmp["sendflag"]) samaDay := strings.Contains(ptime, Publishtime) //判断是否是当天的数据 lock.Lock() if sp := CodeInfoMap[code]; sp != nil { //sp.DownloadAllNum++ //sp.RepeatDownloadAllNum++ if sendflag == "true" { sp.DownloadSuccessNum++ sp.RepeatDownloadSuccessNum++ } if samaDay { sp.PTimeAllNum++ sp.RepeatPTimeAllNum++ if sendflag == "true" { sp.PTimeSuccessNum++ sp.RepeatPTimeSuccessNum++ sp.RepeatPTimeSuccessDataBakNum++ } } } lock.Unlock() }(tmp) if n%1000 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("python数据下载量统计完成...") } //python统计总下载量 func GetPythonDownloadNum_back() { defer qu.Catch() sess := util.MgoPy.GetMgoConn() defer util.MgoPy.DestoryMongoConn(sess) match := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } group1 := map[string]interface{}{ "_id": map[string]interface{}{ "spidercode": "$spidercode", "sendflag": "$sendflag", }, "datacount": map[string]interface{}{ "$sum": 1, }, } group2 := map[string]interface{}{ "_id": "$_id.spidercode", "sendflagarr": map[string]interface{}{ "$push": map[string]interface{}{ "sendflag": "$_id.sendflag", "count": "$datacount", }, }, "count": map[string]interface{}{ "$sum": "$datacount", }, } project := map[string]interface{}{ "infoarr": "$sendflagarr", "count": 1, } p := []map[string]interface{}{ map[string]interface{}{"$match": match}, map[string]interface{}{"$group": group1}, map[string]interface{}{"$group": group2}, map[string]interface{}{"$project": project}, } ch := make(chan bool, 5) wg := &sync.WaitGroup{} lock := &sync.Mutex{} it1 := sess.DB(util.MgoPy.DbName).C("data_bak").Pipe(p).Iter() n := 0 for tmp := make(map[string]interface{}); it1.Next(&tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() code := qu.ObjToString(tmp["_id"]) count := qu.IntAll(tmp["count"]) //下载总量 successCount := 0 //下载成功总量 if infoArr, ok := tmp["infoarr"].([]interface{}); ok { for _, info := range infoArr { infoMap := info.(map[string]interface{}) if sendflag := qu.ObjToString(infoMap["sendflag"]); sendflag == "true" { successCount = qu.IntAll(infoMap["count"]) } } } lock.Lock() if sp := CodeInfoMap[code]; sp != nil { sp.DownloadAllNum = count sp.DownloadSuccessNum = successCount //保存服务发送成功数 } lock.Unlock() }(tmp) if n%100 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("python数据下载量统计完成...") } //python统计异常信息 func GetPythonErrData() { defer qu.Catch() sess := util.MgoPy.GetMgoConn() defer util.MgoPy.DestoryMongoConn(sess) query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": StartTime, "$lt": EndTime, }, } fieles := map[string]interface{}{ "spidercode": 1, "parser_name": 1, "parse_url": 1, "failed": 1, "code": 1, } it := sess.DB(util.MgoPy.DbName).C("mgp_list").Find(&query).Select(&fieles).Iter() n := 0 lock := &sync.Mutex{} wg := &sync.WaitGroup{} ch := make(chan bool, 5) for tmp := make(map[string]interface{}); it.Next(tmp); n++ { wg.Add(1) ch <- true go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() state := qu.IntAll(tmp["code"]) if state == -1 { //状态码为-1表示详情页未执行下载操作,不统计 return } spidercode := qu.ObjToString(tmp["spidercode"]) remark := qu.ObjToString(tmp["parser_name"]) href := qu.ObjToString(tmp["parse_url"]) failed := qu.IntAll(tmp["failed"]) errType := StateFeedBackErr[state] oneErrInfo := &ErrRemark{ Href: href, Remark: remark, } lock.Lock() if spider := CodeInfoMap[spidercode]; spider != nil { if failed == 0 { //未采集 spider.NoDownloadNum++ } else { //下载失败 spider.DownloadFailedNum++ if spider_err := spider.Error; spider_err != nil { if errInfo := spider_err[errType]; errInfo != nil { errInfo.Num++ if len(errInfo.Err) < 3 { //最多存放三个错误数据连接 errInfo.Err = append(errInfo.Err, oneErrInfo) } } else { spider.Error[errType] = &ErrorInfo{ Num: 1, Err: []*ErrRemark{ oneErrInfo, }, } } } else { spider.Error = map[string]*ErrorInfo{ errType: &ErrorInfo{ Num: 1, Err: []*ErrRemark{ oneErrInfo, }, }, } } } } lock.Unlock() }(tmp) if n%100 == 0 { logger.Debug(n) } tmp = map[string]interface{}{} } wg.Wait() logger.Debug("python下载异常数据统计完成...") } //根据爬虫监控信息创建任务流程 func CreateTaskProcess() { defer qu.Catch() logger.Debug("开始生成爬虫任务...") //arr := []map[string]interface{}{} upsertBulk := [][]map[string]interface{}{} //任务更新集 arr := []map[string]interface{}{} //当天爬虫信息集 wg := &sync.WaitGroup{} lock := &sync.Mutex{} ch := make(chan bool, 10) logger.Debug("CodeInfoMap:", len(CodeInfoMap)) for code, spider := range CodeInfoMap { wg.Add(1) ch <- true go func(code string, spider *Spider) { defer func() { <-ch wg.Done() }() //整理新任务的信息 task := &Task{ DescribeMap: map[int]string{}, } //task.Platform = spider.Platform //task.Site = spider.Site //task.Code = spider.Code //task.Channel = spider.Channel //task.ModifyUser = spider.ModifyUser //task.ModifyId = spider.ModifyId //task.FrequencyErrTimes = spider.FrequencyErrTimes //lua、python共有异常publishtime、text if len(spider.Error) > 0 { //1、download:下载异常errtype:5; //2、regather:运行异常errtype:4; //3、publishtime:时间异常errtype:3; //4、text:数据异常errtype:2; for stype, info := range LuaErrTypeInfoMap { if err := spider.Error[stype]; err != nil { taskStateOk := false if stype == "download" { if spider.Model == 1 { //新模式(7100、7110、7200、7210、7300、7310、7400)根据异常总量和占比建任务 moreThanLimit := false //1、异常条数;2、异常占比 if spider.DownloadFailedNum > FailedNumLimit { moreThanLimit = true } else if spider.DownloadAllNum > 0 && (float64(spider.DownloadFailedNum)/float64(spider.DownloadAllNum)) > FailedPercentLimit { moreThanLimit = true } if !moreThanLimit { //不在异常范围,不建该类型任务 continue } } else if spider.Model == 0 && spider.Working == 1 { //老模式,队列模式(7500,7700)有下载异常数据直接建任务 if spider.DownloadFailedNum > 0 { //只有7500、7700出现一条下载异常时,任务状态即为待处理 task.State = 1 //待处理 taskStateOk = true } else { continue } } else if spider.Model == 0 && spider.Working == 0 { //老模式,高性能模式(7410)不建下载异常任务 continue } } //取最大的错误异常类型 if task.ErrType < info.ErrType { task.ErrType = info.ErrType } //download、regather、publishtime、text错误中有一个类型错误个数大于10,任务状态即为待处理 if !taskStateOk && err.Num > 10 { //错误个数大于10为待处理 task.State = 1 //待处理 } //错误描述 descript := info.Remark + ":共" + fmt.Sprint(err.Num) + "条\n" for _, errRemark := range err.Err { if stype == "regather" { //特殊处理运行异常描述 descript += errRemark.Remark + ":" + errRemark.Href + "\n" } else { descript += errRemark.Href + "\n" } } task.DescribeMap[info.ErrType] = descript } } } if spider.Platform == "golua平台" { //lua异常(由于采集频率异常比较特殊固放到最后处理) //5、列表页异常 errtype:7 if spider.ListNoDataTimes > 0 && spider.ListNoDataTimes == spider.ListDownloadAllTimes { if !spider.ListIsFilter || (spider.FindListHeart < util.GetTime(0) && spider.ListIsFilter) { //列表页不含过滤代码或者有过滤无心跳 task.State = 1 //待处理 task.ErrType = TASK_LISTERR task.DescribeMap[TASK_LISTERR] = "列表页异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListNoDataTimes) + "轮无数据\n" } // if !spider.ListIsFilter { //列表页不含过滤代码 // task.State = 1 //待处理 // task.ErrType = TASK_LISTERR // } else if len(task.DescribeMap) == 0 { //只有列表页异常且有过滤代码 // task.State = 0 //待确认 // task.ErrType = TASK_LISTERR // } } //6、采集频率异常 errtype:8 if spider.ListOhPercentTimes > 0 { //采集频率异常 //UpdateLuaInfo(spider) //出现采集频率异常,便更新爬虫的frequencyerrtimes、最大页自动加1、重新上架 //只有当FrequencyErrTimes>3取采集频率异常,相反优先其他异常类型(采集频率异常且待确认时程序自动处理,人工几乎不介入) if spider.FrequencyErrTimes > 3 { //爬虫采集频率异常次数大于3次,任务为待处理,否则为待确认 task.State = 1 //待处理 task.ErrType = TASK_RATEERR } else if len(task.DescribeMap) == 0 { //只有采集频率异常且FrequencyErrTimes<=3 task.State = 0 //待确认 task.ErrType = TASK_RATEERR } task.DescribeMap[TASK_RATEERR] = "采集频率异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListOhPercentTimes) + "轮数据全采\n" } } else if spider.Platform == "python" { //python异常 for stype, info := range PythonErrTypeInfoMap { if err := spider.Error[stype]; err != nil { //取最大的错误异常类型 if task.ErrType < info.ErrType { task.ErrType = info.ErrType } if info.ErrType > 3 { //python404异常、下载异常、运行异常任务状态均为待处理 task.State = 1 } //错误描述 descript := info.Remark + ":共" + fmt.Sprint(err.Num) + "条\n" for _, errRemark := range err.Err { descript += errRemark.Remark + ":" + errRemark.Href + "\n" } //lua和python的info.ErrType:3、4可能同时存在,描述累加 task.DescribeMap[info.ErrType] = descript + task.DescribeMap[info.ErrType] } } } //存储爬虫统计信息 byteText, err := json.Marshal(spider) if err != nil { logger.Debug("Json Marshal Error", code) return } tmp := map[string]interface{}{} if json.Unmarshal(byteText, &tmp) == nil { lock.Lock() arr = append(arr, tmp) lock.Unlock() } else { logger.Debug("Json UnMarshal Error", code) return } //根据爬虫信息新建任务 CreateTask(task, spider, &upsertBulk, lock) //比对历史任务,新建任务 if spider.Platform == "golua平台" { //列表页总下载量 atomic.AddInt64(&LuaListDownloadAllNum, int64(spider.RepeatDownloadAllNum)) //列表页总下载成功量 atomic.AddInt64(&LuaListDownloadSuccessAllNum, int64(spider.RepeatDownloadSuccessNum)) } else { //列表页总下载量 atomic.AddInt64(&PythonListDownloadAllNum, int64(spider.RepeatDownloadAllNum)) //列表页总下载成功量 atomic.AddInt64(&PythonListDownloadSuccessAllNum, int64(spider.RepeatDownloadSuccessNum)) } lock.Lock() if len(arr) > 500 { util.MgoE.SaveBulk("luacodeinfo", arr...) arr = []map[string]interface{}{} } if len(upsertBulk) > 500 { util.MgoE.UpSertBulk("task", upsertBulk...) upsertBulk = [][]map[string]interface{}{} } lock.Unlock() }(code, spider) } wg.Wait() lock.Lock() if len(arr) > 0 { util.MgoE.SaveBulk("luacodeinfo", arr...) arr = []map[string]interface{}{} } if len(upsertBulk) > 0 { util.MgoE.UpSertBulk("task", upsertBulk...) upsertBulk = [][]map[string]interface{}{} } lock.Unlock() logger.Debug("生成任务完成...") CodeInfoMap = map[string]*Spider{} } //新任务与历史任务整合 func CreateTask(t *Task, sp *Spider, upsertBulk *[][]map[string]interface{}, lock *sync.Mutex) { defer qu.Catch() if t.ErrType == 0 { //不是异常任务 return } if sp.PendState == 1 { if sp.DownloadAllNum == 0 { //挂起状态爬虫,且下载量为0,不建任务 return } else { //挂起状态有下载量,更新爬虫挂起状态 sp.PendState = 0 //影响任务i_pendstate状态 util.MgoE.Update("luaconfig", map[string]interface{}{"code": sp.Code}, map[string]interface{}{"$set": map[string]interface{}{"pendstate": 0}}, false, false) } } diff := time.Now().Unix() - sp.AuditTime if sp.State == 5 && diff <= 86400 { //已上架爬虫且爬虫最新一次提交审核时间小于24小时,不建任务 logger.Debug("该爬虫近期维护无需新建任务:", sp.Code) return } descript_new := "" //新任务的异常描述 for _, text := range t.DescribeMap { descript_new += text } query := map[string]interface{}{ "s_code": sp.Code, "i_state": map[string]interface{}{ "$in": []int{0, 1, 2, 3, 5}, //查询现有正在维护的任务 }, } fields := map[string]interface{}{ "i_state": 1, "s_type": 1, "s_descript": 1, "i_times": 1, "s_urgency": 1, } list, _ := util.MgoE.Find("task", query, nil, fields, false, -1, -1) update := []map[string]interface{}{} if list != nil && len(*list) > 0 { //已有任务 if len(*list) > 1 { logger.Error("Code:", sp.Code, "任务异常") util.MgoE.Save("luacreatetaskerr", map[string]interface{}{ "code": sp.Code, "comeintime": time.Now().Unix(), "tasknum": len(*list), }) return } task := (*list)[0] //唯一任务 state_old := qu.IntAll(task["i_state"]) //历史任务状态 times_old := qu.IntAll(task["i_times"]) //历史任务次数 type_old := qu.ObjToString(task["s_type"]) //历史任务异常类型 urgency_old := qu.ObjToString(task["s_urgency"]) //历史任务紧急度 descript_old := qu.ObjToString(task["s_descript"]) //历史任务描述 result := map[string]interface{}{ "i_frequencyerrtimes": sp.FrequencyErrTimes, "i_num": sp.DownloadSuccessNum, //下载量(目前按下载成功量) "l_updatetime": time.Now().Unix(), "i_times": times_old + 1, "s_descript": descript_old + time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + descript_new, } if state_old == 0 || state_old == 1 { //如果历史任务状态为待确认、待处理,更新任务信息,其它状态只追加任务描述、任务次数、下载量 //任务状态state、任务类型s_type if state_old == 1 || t.State == 1 { //新任务、历史任务有一个任务状态为待处理,更新后任务状态为待处理 result["i_state"] = 1 if t.State == 1 && state_old == 1 { //新任务和历史任务均为待处理时,取异常类型等级高者 if t.ErrType > qu.IntAll(type_old) { result["s_type"] = fmt.Sprint(t.ErrType) } } else if t.State == 1 { //新任务为待处理历史任务为待确认,取新任务的类型 result["s_type"] = fmt.Sprint(t.ErrType) } /*else if state_old == 1 { }*/ } else if state_old == 0 && t.State == 0 && t.ErrType > qu.IntAll(type_old) { //新任务、历史任务均为待确认,取异常类型等级高者 result["s_type"] = fmt.Sprint(t.ErrType) } if times_old >= 3 { //某爬虫第四次建任务时,任务状态变为待处理 result["i_state"] = 1 } //任务紧急度urgency urgency := qu.IntAll(urgency_old) if urgency < 4 { result["s_urgency"] = fmt.Sprint(urgency + 1) } //最迟完成时间 if qu.IntAll(result["i_state"]) == 1 && state_old == 0 { //新任务综合处理后任务状态为待处理,历史任务为待确认时,更新最迟完成时间 result["l_complete"] = util.CompleteTime(fmt.Sprint(urgency + 1)) } } update = append(update, map[string]interface{}{"_id": task["_id"]}) update = append(update, map[string]interface{}{"$set": result}) lock.Lock() *upsertBulk = append(*upsertBulk, update) lock.Unlock() } else { //无历史任务 //times := 0 //if t.State == 1 { //待处理times=1 // times = 1 //} saveMap := map[string]interface{}{ "s_modify": sp.ModifyUser, "s_modifyid": sp.ModifyId, "s_code": sp.Code, "s_site": sp.Site, "s_channel": sp.Channel, "i_event": sp.Event, "i_state": t.State, "s_source": "程序", "s_type": fmt.Sprint(t.ErrType), "s_descript": descript_new, "i_times": 1, "i_num": sp.DownloadSuccessNum, //下载量(目前按下载成功量) "l_comeintime": time.Now().Unix(), //"l_updatetime": time.Now().Unix(), "l_complete": util.CompleteTime("1"), "s_urgency": "1", "i_frequencyerrtimes": sp.FrequencyErrTimes, "i_pendstate": sp.PendState, //爬虫挂起状态 } update = append(update, query) update = append(update, saveMap) lock.Lock() *upsertBulk = append(*upsertBulk, update) lock.Unlock() } } //更新爬虫最大页、爬虫上下架 func UpdateLuaInfo(sp *Spider) { defer qu.Catch() //1、更新爬虫信息 set := map[string]interface{}{ "frequencyerrtimes": sp.FrequencyErrTimes, //更新次数 } if sp.FrequencyErrTimes <= 3 { set["param_common.5"] = sp.MaxPage + 1 } logger.Debug("Code:", sp.Code, " ", sp.FrequencyErrTimes) b := util.MgoE.Update("luaconfig", map[string]interface{}{"code": sp.Code}, map[string]interface{}{"$set": set}, false, false) if b && sp.FrequencyErrTimes <= 3 { //FrequencyErrTimes>3时会建采集频率异常的待处理任务,不再上下架 //爬虫下架、上加 qu.Debug("爬虫上下架 code:", sp.Code) CodeLock.Lock() ok, err := util.UpdateSpiderByCodeState(sp.Code, "6", sp.Event) //下架 if ok && err == nil { logger.Debug(sp.Code, "下架成功") ok, err = util.UpdateSpiderByCodeState(sp.Code, "5", sp.Event) //上架 if ok && err == nil { logger.Debug(sp.Code, "上架成功") } } CodeLock.Unlock() } } //重置前一周内未下载成功的数据(一天3次未下成功的数据可以连续下一周) func ResetDataState() { defer qu.Catch() logger.Info("-----更新数据状态-----") sess := util.MgoS.GetMgoConn() defer util.MgoS.DestoryMongoConn(sess) ch := make(chan bool, 3) wg := &sync.WaitGroup{} lock := &sync.Mutex{} query := map[string]interface{}{ "comeintime": map[string]interface{}{ "$gte": util.GetTime(-util.DayNum), "$lt": util.GetTime(0), }, "state": -1, } field := map[string]interface{}{ "_id": 1, } it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter() count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count() logger.Info("更新数据状态数量:", count) n := 0 arr := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(tmp); n++ { ch <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-ch wg.Done() }() update := []map[string]interface{}{} update = append(update, map[string]interface{}{"_id": tmp["_id"]}) update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}}) lock.Lock() arr = append(arr, update) if len(arr) > 500 { tmps := arr util.MgoS.UpdateBulk("spider_highlistdata", tmps...) arr = [][]map[string]interface{}{} } lock.Unlock() }(tmp) tmp = map[string]interface{}{} } wg.Wait() lock.Lock() if len(arr) > 0 { util.MgoS.UpdateBulk("spider_highlistdata", arr...) arr = [][]map[string]interface{}{} } lock.Unlock() logger.Info("-----更新数据状态完毕-----") } //关闭任务 func CloseTask() { qu.Catch() logger.Debug("---清理未更新任务---") decreaseDay, day := 0, 0 var cleanDay string for { decreaseDay-- weekDay := time.Now().AddDate(0, 0, decreaseDay).Weekday().String() if weekDay != "Saturday" && weekDay != "Sunday" { day++ } if day == util.CloseNum { cleanDay = time.Now().AddDate(0, 0, decreaseDay).Format("2006-01-02") break } } the_time, _ := time.ParseInLocation(qu.Date_Short_Layout, cleanDay, time.Local) unix_time := the_time.Unix() //凌晨时间戳 query := map[string]interface{}{ "i_state": 0, "l_complete": map[string]interface{}{ "$lt": unix_time + 86400, }, "s_type": "1", // "s_type": map[string]interface{}{ // "$ne": "7", // }, } logger.Debug("query:", query) set := map[string]interface{}{ "$set": map[string]interface{}{ "i_state": 6, }, } util.MgoE.Update("task", query, set, false, true) logger.Debug("---清理未更新任务完毕---") } //保存爬虫每日监控信息 func SaveCodeInfo() { defer qu.Catch() arr := []map[string]interface{}{} wg := &sync.WaitGroup{} lock := &sync.Mutex{} ch := make(chan bool, 10) logger.Debug("CodeInfoMap:", len(CodeInfoMap)) for code, spider := range CodeInfoMap { wg.Add(1) ch <- true go func(code string, sp Spider) { defer func() { <-ch wg.Done() }() byteText, err := json.Marshal(sp) if err != nil { logger.Debug("Json Marshal Error", code) return } tmp := map[string]interface{}{} if json.Unmarshal(byteText, &tmp) == nil { lock.Lock() arr = append(arr, tmp) lock.Unlock() } else { logger.Debug("Json UnMarshal Error", code) return } if sp.Platform == "golua平台" { //列表页总下载量 atomic.AddInt64(&LuaListDownloadAllNum, int64(sp.RepeatDownloadAllNum)) //列表页总下载成功量 atomic.AddInt64(&LuaListDownloadSuccessAllNum, int64(sp.RepeatDownloadSuccessNum)) } else { //列表页总下载量 atomic.AddInt64(&PythonListDownloadAllNum, int64(sp.RepeatDownloadAllNum)) //列表页总下载成功量 atomic.AddInt64(&PythonListDownloadSuccessAllNum, int64(sp.RepeatDownloadSuccessNum)) } lock.Lock() if len(arr) > 500 { util.MgoE.SaveBulk("luacodeinfo_back", arr...) arr = []map[string]interface{}{} } lock.Unlock() }(code, *spider) } wg.Wait() if len(arr) > 0 { util.MgoE.SaveBulk("luacodeinfo_back", arr...) arr = []map[string]interface{}{} } logger.Debug("爬虫基本信息生成完成...") } func SaveUserCreateTaskNum() { defer qu.Catch() for user, sn := range UserTaskNum { save := map[string]interface{}{} save["user"] = user save["comeintime"] = time.Now().Unix() for s, n := range sn { save[s] = n } util.MgoE.Save("luausertask", save) } UserTaskNum = map[string]map[string]int{} }