|
@@ -0,0 +1,1335 @@
|
|
|
+package luatask
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ qu "qfw/util"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
+ "util"
|
|
|
+
|
|
|
+ "github.com/donnie4w/go-logger/logger"
|
|
|
+)
|
|
|
+
|
|
|
+//采集频率异常、列表页异常、404异常、下载异常、运行异常、时间异常、数据异常
|
|
|
+const TASK_RATEERR, TASK_LISTERR, TASK_404ERR, TASK_DOWNLOADERR, TASK_RUNERR, TASK_TIMEERR, TASK_DATAERR = 8, 7, 6, 5, 4, 3, 2
|
|
|
+
|
|
|
+var CodeInfoMap map[string]*Spider
|
|
|
+var StateFeedBackErr = map[int]string{
|
|
|
+ 0: "timeout",
|
|
|
+ 200: "analysis",
|
|
|
+ 404: "download",
|
|
|
+ 500: "server",
|
|
|
+}
|
|
|
+
|
|
|
+var PythonErrTypeInfoMap = map[string]ErrTypeInfo{
|
|
|
+ "download": ErrTypeInfo{
|
|
|
+ ErrType: TASK_404ERR,
|
|
|
+ Remark: "下载异常",
|
|
|
+ },
|
|
|
+ "server": ErrTypeInfo{
|
|
|
+ ErrType: TASK_DOWNLOADERR,
|
|
|
+ Remark: "服务异常",
|
|
|
+ },
|
|
|
+ "analysis": ErrTypeInfo{
|
|
|
+ ErrType: TASK_RUNERR,
|
|
|
+ Remark: "解析异常",
|
|
|
+ },
|
|
|
+ "timeout": ErrTypeInfo{
|
|
|
+ ErrType: TASK_TIMEERR,
|
|
|
+ Remark: "超时异常",
|
|
|
+ },
|
|
|
+}
|
|
|
+var LuaErrTypeInfoMap = map[string]ErrTypeInfo{
|
|
|
+ "download": ErrTypeInfo{
|
|
|
+ ErrType: TASK_DOWNLOADERR,
|
|
|
+ Remark: "下载异常",
|
|
|
+ },
|
|
|
+ "regather": ErrTypeInfo{
|
|
|
+ ErrType: TASK_RUNERR,
|
|
|
+ Remark: "运行异常",
|
|
|
+ },
|
|
|
+ "publishtime": ErrTypeInfo{
|
|
|
+ ErrType: TASK_TIMEERR,
|
|
|
+ Remark: "时间异常",
|
|
|
+ },
|
|
|
+ "text": ErrTypeInfo{
|
|
|
+ ErrType: TASK_DATAERR,
|
|
|
+ Remark: "数据异常",
|
|
|
+ },
|
|
|
+}
|
|
|
+
|
|
|
+//spider
|
|
|
+type Spider struct {
|
|
|
+ Site string `json:"site"` //站点
|
|
|
+ Platform string `json:"platform"` //平台
|
|
|
+ Code string `json:"spidercode"` //爬虫
|
|
|
+ Channel string `json:"channel"` //栏目
|
|
|
+ AuditTime int64 `json:"audittime"` //最新审核时间
|
|
|
+ ModifyUser string `json:"modifyuser"` //维护人
|
|
|
+ ModifyId string `json:"modifyid"` //维护人id
|
|
|
+ Event int `json:"event"` //节点
|
|
|
+ State int `json:"state"` //状态
|
|
|
+ FrequencyErrTimes int `json:"frequencyerrtimes"` //爬虫采集频率异常次数
|
|
|
+ MaxPage int `json:"maxpage"` //采集最大页
|
|
|
+ Model int `json:"model"` //采集模式(新\老) 0:老模式;1:新模式
|
|
|
+ Working int `json:"working"` //采集模式(高低\性能)0:高性能模式;1:队列模式
|
|
|
+ ListIsFilter bool `json:"listisfilter"` //lua列表页采集是否包含过滤
|
|
|
+ DownloadAllNum int `json:"downloadallnum"` //总下载量
|
|
|
+ DownloadSuccessNum int `json:"downloadsuccessnum"` //下载成功量
|
|
|
+ DownloadFailedNum int `json:"downloadfailednum"` //下载失败量
|
|
|
+ NoDownloadNum int `json:"nodownloadnum"` //未下载量
|
|
|
+ ListDownloadAllTimes int `json:"listdownloadalltimes"` //一天内列表页总下载次数
|
|
|
+ ListOhPercentTimes int `json:"listohpercenttimes"` //列表页采集百分百次数
|
|
|
+ ListNoDataTimes int `json:"listnodatatimes"` //一天内列表页下载无数据次数
|
|
|
+ Comeintime int64 `json:"comeintime"` //入库时间
|
|
|
+ Error map[string]*ErrorInfo `json:"error"`
|
|
|
+ //OhPercentTimes int `json:"ohpercentimes"` //采集量占总下载量100%的次数
|
|
|
+ //NtPercentTime int `json:"ntpercentimes"` //采集量占总下载量90%-100%的次数
|
|
|
+ //EtPercentTime int `json:"etpercentimes"` //采集量占总下载量80%-90%的次数
|
|
|
+}
|
|
|
+
|
|
|
+//spider:错误异常
|
|
|
+type ErrorInfo struct {
|
|
|
+ Num int //错误条数
|
|
|
+ Err []*ErrRemark //错误详情
|
|
|
+}
|
|
|
+
|
|
|
+//spider
|
|
|
+type ErrRemark struct {
|
|
|
+ Href string //链接
|
|
|
+ Remark string //异常说明
|
|
|
+}
|
|
|
+
|
|
|
+//task
|
|
|
+type Task struct {
|
|
|
+ Platform string //平台
|
|
|
+ Code string //爬虫代码
|
|
|
+ Site string //站点
|
|
|
+ Channel string //栏目
|
|
|
+ ModifyUser string //维护人员
|
|
|
+ ModifyId string //维护人员id
|
|
|
+ ErrType int //异常类型:8:采集频率异常;7:列表页异常;5:下载异常;4:运行异常;3:发布时间异常;2:数据异常;1:数据量异常
|
|
|
+ Description string //描述
|
|
|
+ State int //状态
|
|
|
+ Event int //节点
|
|
|
+ Num int //下载量
|
|
|
+ FrequencyErrTimes int //爬虫采集频率异常次数
|
|
|
+ DescribeMap map[int]string
|
|
|
+ //ErrInfo map[string]map[string]interface{} //异常集合
|
|
|
+}
|
|
|
+
|
|
|
+//task:任务异常类型信息
|
|
|
+type ErrTypeInfo struct {
|
|
|
+ ErrType int //任务异常类型
|
|
|
+ Remark string //异常类型说明
|
|
|
+}
|
|
|
+
|
|
|
+var (
|
|
|
+ StartTime int64 //上一个工作日的起始时间
|
|
|
+ EndTime int64 //上一个工作日的结束时间
|
|
|
+ TaskMap map[string]*Task //任务集合
|
|
|
+ UserTaskNum map[string]map[string]int //记录每人每天新建任务量
|
|
|
+ //
|
|
|
+)
|
|
|
+
|
|
|
+func StartTask() {
|
|
|
+ InitInfo() //初始化时间
|
|
|
+ logger.Debug(StartTime, EndTime)
|
|
|
+ PrapareCodeBaseInfo() //初始化爬虫基本信息
|
|
|
+ GetSpiderListDownloadNum() //统计爬虫列表页下载量、下载失败量、未下载量
|
|
|
+ GetSpiderDownloadRateDataNew()
|
|
|
+ GetSpiderWarnErrData()
|
|
|
+ GetPythonWarnErrData()
|
|
|
+ //SaveCodeInfo()
|
|
|
+ CreateTaskProcess()
|
|
|
+ // GetDownloadNumber() //统计下载量
|
|
|
+ ResetDataState() //更新数据状态
|
|
|
+ //CloseTask() //关闭任务
|
|
|
+}
|
|
|
+
|
|
|
+//初始化
|
|
|
+func InitInfo() {
|
|
|
+ defer qu.Catch()
|
|
|
+ CodeInfoMap = map[string]*Spider{} //初始化
|
|
|
+ UserTaskNum = map[string]map[string]int{}
|
|
|
+ StartTime, EndTime = util.GetWorkDayTimeUnix()
|
|
|
+ //StartTime = util.GetTime(-1)
|
|
|
+ //EndTime = util.GetTime(0)
|
|
|
+}
|
|
|
+
|
|
|
+// PrapareCodeBaseInfo 准备爬虫基本信息
|
|
|
+func PrapareCodeBaseInfo() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoE.GetMgoConn()
|
|
|
+ defer util.MgoE.DestoryMongoConn(sess)
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "$or": []interface{}{
|
|
|
+ //lua、python上线爬虫
|
|
|
+ map[string]interface{}{
|
|
|
+ "state": map[string]interface{}{
|
|
|
+ "$in": []int{5, 11}, //上架、上线爬虫
|
|
|
+ },
|
|
|
+ },
|
|
|
+ //lua正在被维护的爬虫
|
|
|
+ map[string]interface{}{
|
|
|
+ "platform": "golua平台",
|
|
|
+ "state": map[string]interface{}{
|
|
|
+ "$in": []int{0, 1, 2}, //待完成、待审核、未通过
|
|
|
+ },
|
|
|
+ "event": map[string]interface{}{
|
|
|
+ "$ne": 7000,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ //python正在被维护的爬虫
|
|
|
+ map[string]interface{}{
|
|
|
+ "platform": "python",
|
|
|
+ "state": map[string]interface{}{
|
|
|
+ "$in": []int{1, 2, 6}, //待审核、未通过
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fieles := map[string]interface{}{
|
|
|
+ "event": 1,
|
|
|
+ "param_common": 1,
|
|
|
+ "platform": 1,
|
|
|
+ "modifyuser": 1,
|
|
|
+ "modifyuserid": 1,
|
|
|
+ "state": 1,
|
|
|
+ "l_uploadtime": 1,
|
|
|
+ "listisfilter": 1,
|
|
|
+ "frequencyerrtimes": 1,
|
|
|
+ }
|
|
|
+ count := util.MgoE.Count("luaconfig", query)
|
|
|
+ logger.Debug("共加载线上爬虫个数:", count)
|
|
|
+ it := sess.DB(util.MgoE.DbName).C("luaconfig").Find(&query).Select(&fieles).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ info := &Spider{
|
|
|
+ Error: map[string]*ErrorInfo{},
|
|
|
+ }
|
|
|
+ if param_common, ok := tmp["param_common"].([]interface{}); ok && len(param_common) >= 6 {
|
|
|
+ info.Code = qu.ObjToString(param_common[0])
|
|
|
+ info.Site = qu.ObjToString(param_common[1])
|
|
|
+ info.Channel = qu.ObjToString(param_common[2])
|
|
|
+ info.MaxPage = qu.IntAll(param_common[5])
|
|
|
+ } else {
|
|
|
+ logger.Debug("加载爬虫出错:", tmp["_id"])
|
|
|
+ }
|
|
|
+ info.ModifyUser = qu.ObjToString(tmp["modifyuser"])
|
|
|
+ info.ModifyId = qu.ObjToString(tmp["modifyuserid"])
|
|
|
+ info.AuditTime = qu.Int64All(tmp["l_uploadtime"])
|
|
|
+ info.Platform = qu.ObjToString(tmp["platform"])
|
|
|
+ info.Event = qu.IntAll(tmp["event"])
|
|
|
+ info.State = qu.IntAll(tmp["state"])
|
|
|
+ info.ListIsFilter = tmp["listisfilter"].(bool)
|
|
|
+ info.FrequencyErrTimes = qu.IntAll(tmp["frequencyerrtimes"])
|
|
|
+ info.Model = util.CodeEventModel[info.Event]
|
|
|
+ info.Working = util.CodeEventWorking[info.Event]
|
|
|
+ info.Comeintime = time.Now().Unix()
|
|
|
+ lock.Lock()
|
|
|
+ CodeInfoMap[info.Code] = info
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ logger.Debug(n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Debug("爬虫基本信息准备完成...", len(CodeInfoMap))
|
|
|
+}
|
|
|
+
|
|
|
+// GetSpiderListDownloadNum 统计爬虫列表页下载量和下载失败量
|
|
|
+func GetSpiderListDownloadNum() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoS.DestoryMongoConn(sess)
|
|
|
+ match := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ group1 := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "spidercode": "$spidercode",
|
|
|
+ "state": "$state",
|
|
|
+ },
|
|
|
+ "datacount": map[string]interface{}{
|
|
|
+ "$sum": 1,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ group2 := map[string]interface{}{
|
|
|
+ "_id": "$_id.spidercode",
|
|
|
+ "stateinfo": map[string]interface{}{
|
|
|
+ "$push": map[string]interface{}{
|
|
|
+ "state": "$_id.state",
|
|
|
+ "count": "$datacount",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ "count": map[string]interface{}{
|
|
|
+ "$sum": "$datacount",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ project := map[string]interface{}{
|
|
|
+ "statearr": "$stateinfo",
|
|
|
+ "count": 1,
|
|
|
+ }
|
|
|
+ p := []map[string]interface{}{
|
|
|
+ map[string]interface{}{"$match": match},
|
|
|
+ map[string]interface{}{"$group": group1},
|
|
|
+ map[string]interface{}{"$group": group2},
|
|
|
+ map[string]interface{}{"$project": project},
|
|
|
+ }
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ //1、统计spider_highlistdata
|
|
|
+ it1 := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Pipe(p).Iter()
|
|
|
+ n1 := 0
|
|
|
+ for tmp := make(map[string]interface{}); it1.Next(&tmp); n1++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["_id"])
|
|
|
+ count := qu.IntAll(tmp["count"]) //下载总量
|
|
|
+ successCount := 0 //下载成功总量
|
|
|
+ failedCount := 0 //下载失败量
|
|
|
+ noCount := 0 //未下载量
|
|
|
+ if stateArr, ok := tmp["statearr"].([]interface{}); ok { //某个爬虫的下载量信息
|
|
|
+ for _, stateInfo := range stateArr {
|
|
|
+ infoMap := stateInfo.(map[string]interface{})
|
|
|
+ state := qu.IntAll(infoMap["state"])
|
|
|
+ if state == 1 { //state:1,下载成功量
|
|
|
+ successCount = qu.IntAll(infoMap["count"])
|
|
|
+ } else if state == -1 { //state:-1,下载失败量
|
|
|
+ failedCount = qu.IntAll(infoMap["count"])
|
|
|
+ } else if state == 0 { //state:0,未下载量
|
|
|
+ noCount = qu.IntAll(infoMap["count"])
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ errArr := []*ErrRemark{}
|
|
|
+ if failedCount > 0 { //有采集失败的数据,查询失败链接
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ "spidercode": code,
|
|
|
+ "state": -1,
|
|
|
+ }
|
|
|
+ logger.Debug("采集失败爬虫:", code)
|
|
|
+ list, _ := util.MgoS.Find("spider_highlistdata", query, nil, map[string]interface{}{"href": 1}, false, 0, 3)
|
|
|
+ for _, l := range *list {
|
|
|
+ errArr = append(errArr, &ErrRemark{
|
|
|
+ Href: qu.ObjToString(l["href"]),
|
|
|
+ Remark: "Download Failed",
|
|
|
+ })
|
|
|
+ }
|
|
|
+ }
|
|
|
+ lock.Lock()
|
|
|
+ if spider := CodeInfoMap[code]; spider != nil {
|
|
|
+ spider.DownloadAllNum = count
|
|
|
+ spider.DownloadSuccessNum = successCount
|
|
|
+ spider.DownloadFailedNum = failedCount
|
|
|
+ spider.NoDownloadNum = noCount
|
|
|
+ if len(errArr) > 0 {
|
|
|
+ spider.Error["download"] = &ErrorInfo{
|
|
|
+ Num: failedCount,
|
|
|
+ Err: errArr,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n1%100 == 0 {
|
|
|
+ logger.Debug(n1)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ //2、统计spider_listdata
|
|
|
+ it2 := sess.DB(util.MgoS.DbName).C("spider_listdata").Pipe(p).Iter()
|
|
|
+ n2 := 0
|
|
|
+ for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["_id"])
|
|
|
+ count := qu.IntAll(tmp["count"]) //下载总量(不准确,含重复数据)
|
|
|
+ successCount := 0
|
|
|
+ failedCount := 0 //下载失败量(不准确,含重复数据)
|
|
|
+ noCount := 0 //未下载量
|
|
|
+ if stateArr, ok := tmp["statearr"].([]interface{}); ok {
|
|
|
+ for _, stateInfo := range stateArr {
|
|
|
+ infoMap := stateInfo.(map[string]interface{})
|
|
|
+ state := qu.IntAll(infoMap["state"])
|
|
|
+ if state == 1 { //state:1,下载成功量
|
|
|
+ successCount = qu.IntAll(infoMap["count"])
|
|
|
+ } else if state == -1 { //state:-1,下载失败量
|
|
|
+ failedCount = qu.IntAll(infoMap["count"])
|
|
|
+ } else if state == 0 { //state:0,下载失败量
|
|
|
+ noCount = qu.IntAll(infoMap["count"])
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //errArr := []map[string]interface{}{}
|
|
|
+ //if failedCount > 0 { //有采集失败的数据,查询失败链接
|
|
|
+ // match2["spidercode"] = code
|
|
|
+ // match2["state"] = -1
|
|
|
+ // logger.Debug("采集失败数据query:", match2)
|
|
|
+ // list, _ := util.MgoS.Find("spider_listdata", match2, nil, map[string]interface{}{"href": 1}, false, 0, 3)
|
|
|
+ // for _, l := range *list {
|
|
|
+ // errArr = append(errArr, map[string]interface{}{
|
|
|
+ // "href": l["href"],
|
|
|
+ // "remark": "Download Failed",
|
|
|
+ // })
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ lock.Lock()
|
|
|
+ if spider := CodeInfoMap[code]; spider != nil {
|
|
|
+ spider.DownloadAllNum = count
|
|
|
+ spider.DownloadSuccessNum = successCount
|
|
|
+ spider.DownloadFailedNum = failedCount
|
|
|
+ spider.NoDownloadNum = noCount
|
|
|
+ //if len(errArr) > 0 {
|
|
|
+ // spider.Error["download"] = &ErrorInfo{
|
|
|
+ // Num: failedCount,
|
|
|
+ // Err: errArr,
|
|
|
+ // }
|
|
|
+ //}
|
|
|
+ } else {
|
|
|
+ logger.Debug("-------------", code)
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n2%100 == 0 {
|
|
|
+ logger.Debug(n2)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Debug("统计采集量完成...")
|
|
|
+}
|
|
|
+
|
|
|
+// GetSpiderDownloadRateDataNew 汇总列表页采集频率情况
|
|
|
+func GetSpiderDownloadRateDataNew() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoS.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ date := qu.FormatDateByInt64(&StartTime, qu.Date_Short_Layout)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "date": date,
|
|
|
+ "event": map[string]interface{}{
|
|
|
+ "$ne": 7000,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "spidercode": 1,
|
|
|
+ "alltimes": 1,
|
|
|
+ "zero": 1,
|
|
|
+ "oh_percent": 1,
|
|
|
+ }
|
|
|
+ logger.Debug("query:", query)
|
|
|
+ it := sess.DB(util.MgoS.DbName).C("spider_downloadrate").Find(&query).Select(&fields).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["spidercode"])
|
|
|
+ alltimes := qu.IntAll(tmp["alltimes"])
|
|
|
+ zero := qu.IntAll(tmp["zero"])
|
|
|
+ oh_percent := qu.IntAll(tmp["oh_percent"])
|
|
|
+ lock.Lock()
|
|
|
+ if spider := CodeInfoMap[code]; spider != nil {
|
|
|
+ spider.ListDownloadAllTimes = alltimes
|
|
|
+ spider.ListNoDataTimes = zero
|
|
|
+ if oh_percent > 0 && util.CodeEventModel[spider.Event] != 0 { //含有100%采集,及为采集频率异常(由于7410、7500、7700为老模式的队列模式,不建采集频率异常任务)
|
|
|
+ spider.FrequencyErrTimes++
|
|
|
+ spider.ListOhPercentTimes = oh_percent
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ logger.Debug("-------------", code)
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%1000 == 0 {
|
|
|
+ logger.Debug("current:", n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Debug("列表页采集统计完成...")
|
|
|
+}
|
|
|
+
|
|
|
+//汇总lua错误信息数据
|
|
|
+func GetSpiderWarnErrData() {
|
|
|
+ defer qu.Catch()
|
|
|
+ logger.Debug("错误信息数据统计...")
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoS.DestoryMongoConn(sess)
|
|
|
+ match := map[string]interface{}{
|
|
|
+ "level": 2,
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ group1 := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "code": "$code",
|
|
|
+ "info": "$info",
|
|
|
+ },
|
|
|
+ "datacount": map[string]interface{}{
|
|
|
+ "$sum": 1,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ group2 := map[string]interface{}{
|
|
|
+ "_id": "$_id.code",
|
|
|
+ "infotext": map[string]interface{}{
|
|
|
+ "$push": map[string]interface{}{
|
|
|
+ "info": "$_id.info",
|
|
|
+ "count": "$datacount",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ "count": map[string]interface{}{
|
|
|
+ "$sum": "$datacount",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ project := map[string]interface{}{
|
|
|
+ "infoarr": "$infotext",
|
|
|
+ "count": 1,
|
|
|
+ }
|
|
|
+ p := []map[string]interface{}{
|
|
|
+ map[string]interface{}{"$match": match},
|
|
|
+ map[string]interface{}{"$group": group1},
|
|
|
+ map[string]interface{}{"$group": group2},
|
|
|
+ map[string]interface{}{"$project": project},
|
|
|
+ }
|
|
|
+ logger.Debug("spider_warn:", match)
|
|
|
+ //1、统计spider_warn
|
|
|
+ it1 := sess.DB(util.MgoS.DbName).C("spider_warn").Pipe(p).Iter()
|
|
|
+ n1 := 0
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ for tmp := make(map[string]interface{}); it1.Next(&tmp); n1++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["_id"])
|
|
|
+ //spider.Error = map[string]*ErrorInfo{} //初始化
|
|
|
+ if infoArr, ok := tmp["infoarr"].([]interface{}); ok {
|
|
|
+ for _, info := range infoArr {
|
|
|
+ stype := ""
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "level": 2,
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ infoMap := info.(map[string]interface{})
|
|
|
+ infoText := qu.ObjToString(infoMap["info"]) //错误信息
|
|
|
+ errCount := qu.IntAll(infoMap["count"]) //错误数量
|
|
|
+ if infoText == "Publishtime Is Too Late" { //发布时间超前
|
|
|
+ query["info"] = infoText
|
|
|
+ stype = "publishtime"
|
|
|
+ } else if infoText == "Publishtime Is Less Than Zero" { //发布时间小于0
|
|
|
+ query["info"] = infoText
|
|
|
+ stype = "publishtime"
|
|
|
+ } else if infoText == "Publishtime Is Too Early" { //发布时间过小
|
|
|
+ query["info"] = infoText
|
|
|
+ stype = "publishtime"
|
|
|
+ } else if infoText == "Field Value Not Contains Chinese" { //title、detail不含中文
|
|
|
+ query["info"] = infoText
|
|
|
+ stype = "text"
|
|
|
+ } else if infoText == "Field Value Contains Random Code" { //title、detail含乱码
|
|
|
+ query["info"] = infoText
|
|
|
+ stype = "text"
|
|
|
+ } else {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ query["code"] = code
|
|
|
+ //logger.Debug(query)
|
|
|
+ //errArr := []*ErrRemark{}
|
|
|
+ //list, _ := util.MgoS.Find("spider_warn", query, nil, map[string]interface{}{"href": 1}, false, 0, 3)
|
|
|
+ //for _, l := range *list {
|
|
|
+ // errArr = append(errArr, &ErrRemark{
|
|
|
+ // Href: qu.ObjToString(l["href"]),
|
|
|
+ // Remark: infoText,
|
|
|
+ // })
|
|
|
+ //}
|
|
|
+ one, _ := util.MgoS.FindOne("spider_warn", query) //查询该错误信息类型的一条href
|
|
|
+ oneErrInfo := &ErrRemark{
|
|
|
+ Href: qu.ObjToString((*one)["href"]),
|
|
|
+ Remark: infoText,
|
|
|
+ }
|
|
|
+ lock.Lock()
|
|
|
+ if spider := CodeInfoMap[code]; spider != nil {
|
|
|
+ if errMap := spider.Error[stype]; errMap != nil {
|
|
|
+ errMap.Num += errCount
|
|
|
+ errMap.Err = append(errMap.Err, oneErrInfo)
|
|
|
+ } else {
|
|
|
+ spider.Error[stype] = &ErrorInfo{
|
|
|
+ Num: errCount,
|
|
|
+ Err: []*ErrRemark{
|
|
|
+ oneErrInfo,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }(tmp)
|
|
|
+ if n1%10 == 0 {
|
|
|
+ logger.Debug(n1)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ //2、统计regatherdata
|
|
|
+ match = map[string]interface{}{
|
|
|
+ "state": map[string]interface{}{
|
|
|
+ "$lte": 1,
|
|
|
+ },
|
|
|
+ "from": "lua",
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ group1 = map[string]interface{}{
|
|
|
+ "_id": "$spidercode",
|
|
|
+ "count": map[string]interface{}{
|
|
|
+ "$sum": 1,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ p = []map[string]interface{}{
|
|
|
+ map[string]interface{}{"$match": match},
|
|
|
+ map[string]interface{}{"$group": group1},
|
|
|
+ }
|
|
|
+ logger.Debug("regather query:", match)
|
|
|
+ it2 := sess.DB(util.MgoS.DbName).C("regatherdata").Pipe(p).Iter()
|
|
|
+ n2 := 0
|
|
|
+ for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["_id"]) //爬虫代码
|
|
|
+ count := qu.IntAll(tmp["count"]) //异常数据量
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "state": map[string]interface{}{
|
|
|
+ "$lte": 1,
|
|
|
+ },
|
|
|
+ "from": "lua",
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ "spidercode": code,
|
|
|
+ }
|
|
|
+ //logger.Debug("query:", query)
|
|
|
+
|
|
|
+ errArr := []*ErrRemark{}
|
|
|
+ list, _ := util.MgoS.Find("regatherdata", query, nil, map[string]interface{}{"href": 1, "error": 1}, false, 0, 3)
|
|
|
+ for _, l := range *list {
|
|
|
+ errArr = append(errArr, &ErrRemark{
|
|
|
+ Href: qu.ObjToString(l["href"]),
|
|
|
+ Remark: qu.ObjToString(l["error"]),
|
|
|
+ })
|
|
|
+ }
|
|
|
+ //one, _ := util.MgoS.FindOne("regatherdata", query) //查询该错误信息类型的一条href
|
|
|
+ //oneErrInfo := &ErrRemark{
|
|
|
+ // Href: qu.ObjToString((*one)["href"]),
|
|
|
+ // Remark: qu.ObjToString((*one)["error"]),
|
|
|
+ //}
|
|
|
+ if spider := CodeInfoMap[code]; spider != nil {
|
|
|
+ spider.Error["regather"] = &ErrorInfo{
|
|
|
+ Num: count,
|
|
|
+ Err: errArr,
|
|
|
+ }
|
|
|
+ // if spider_err := spider.Error; spider_err != nil {
|
|
|
+ // spider_err["regather"] = &ErrorInfo{
|
|
|
+ // Num: count,
|
|
|
+ // Err: []map[string]interface{}{
|
|
|
+ // oneErrInfo,
|
|
|
+ // },
|
|
|
+ // }
|
|
|
+ // } else {
|
|
|
+ // spider.Error = map[string]*ErrorInfo{
|
|
|
+ // "regather": &ErrorInfo{
|
|
|
+ // Num: count,
|
|
|
+ // Err: []map[string]interface{}{
|
|
|
+ // oneErrInfo,
|
|
|
+ // },
|
|
|
+ // },
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ }
|
|
|
+ }(tmp)
|
|
|
+ if n2%10 == 0 {
|
|
|
+ logger.Debug(n2)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Debug("错误信息数据统计完成...")
|
|
|
+}
|
|
|
+
|
|
|
+//汇总python错误信息数据
|
|
|
+func GetPythonWarnErrData() {
|
|
|
+ GetPythonDownloadNum() //统计总下载量
|
|
|
+ GetPythonErrData() //统计异常信息
|
|
|
+}
|
|
|
+
|
|
|
+//统计总下载量
|
|
|
+func GetPythonDownloadNum() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoPy.GetMgoConn()
|
|
|
+ defer util.MgoPy.DestoryMongoConn(sess)
|
|
|
+ match := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ group1 := map[string]interface{}{
|
|
|
+ "_id": map[string]interface{}{
|
|
|
+ "spidercode": "$spidercode",
|
|
|
+ "sendflag": "$sendflag",
|
|
|
+ },
|
|
|
+ "datacount": map[string]interface{}{
|
|
|
+ "$sum": 1,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ group2 := map[string]interface{}{
|
|
|
+ "_id": "$_id.spidercode",
|
|
|
+ "sendflagarr": map[string]interface{}{
|
|
|
+ "$push": map[string]interface{}{
|
|
|
+ "sendflag": "$_id.sendflag",
|
|
|
+ "count": "$datacount",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ "count": map[string]interface{}{
|
|
|
+ "$sum": "$datacount",
|
|
|
+ },
|
|
|
+ }
|
|
|
+ project := map[string]interface{}{
|
|
|
+ "infoarr": "$sendflagarr",
|
|
|
+ "count": 1,
|
|
|
+ }
|
|
|
+ p := []map[string]interface{}{
|
|
|
+ map[string]interface{}{"$match": match},
|
|
|
+ map[string]interface{}{"$group": group1},
|
|
|
+ map[string]interface{}{"$group": group2},
|
|
|
+ map[string]interface{}{"$project": project},
|
|
|
+ }
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ it1 := sess.DB(util.MgoPy.DbName).C("data_bak").Pipe(p).Iter()
|
|
|
+ n := 0
|
|
|
+ for tmp := make(map[string]interface{}); it1.Next(&tmp); n++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ code := qu.ObjToString(tmp["_id"])
|
|
|
+ count := qu.IntAll(tmp["count"]) //下载总量
|
|
|
+ successCount := 0 //下载成功总量
|
|
|
+ if infoArr, ok := tmp["infoarr"].([]interface{}); ok {
|
|
|
+ for _, info := range infoArr {
|
|
|
+ infoMap := info.(map[string]interface{})
|
|
|
+ if sendflag := qu.ObjToString(infoMap["sendflag"]); sendflag == "true" {
|
|
|
+ successCount = qu.IntAll(infoMap["count"])
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ lock.Lock()
|
|
|
+ if spider := CodeInfoMap[code]; spider != nil {
|
|
|
+ spider.DownloadAllNum = count
|
|
|
+ spider.DownloadSuccessNum = successCount //保存服务发送成功数
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ logger.Debug(n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Debug("python数据下载量统计完成...")
|
|
|
+}
|
|
|
+
|
|
|
+//统计异常信息
|
|
|
+func GetPythonErrData() {
|
|
|
+ defer qu.Catch()
|
|
|
+ sess := util.MgoPy.GetMgoConn()
|
|
|
+ defer util.MgoPy.DestoryMongoConn(sess)
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": StartTime,
|
|
|
+ "$lt": EndTime,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fieles := map[string]interface{}{
|
|
|
+ "spidercode": 1,
|
|
|
+ "parser_name": 1,
|
|
|
+ "parse_url": 1,
|
|
|
+ "failed": 1,
|
|
|
+ "code": 1,
|
|
|
+ }
|
|
|
+ it := sess.DB(util.MgoPy.DbName).C("mgp_list").Find(&query).Select(&fieles).Iter()
|
|
|
+ n := 0
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ ch := make(chan bool, 5)
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ state := qu.IntAll(tmp["code"])
|
|
|
+ if state == -1 { //状态码为-1表示详情页未执行下载操作,不统计
|
|
|
+ return
|
|
|
+ }
|
|
|
+ spidercode := qu.ObjToString(tmp["spidercode"])
|
|
|
+ remark := qu.ObjToString(tmp["parser_name"])
|
|
|
+ href := qu.ObjToString(tmp["parse_url"])
|
|
|
+ failed := qu.IntAll(tmp["failed"])
|
|
|
+ errType := StateFeedBackErr[state]
|
|
|
+ oneErrInfo := &ErrRemark{
|
|
|
+ Href: href,
|
|
|
+ Remark: remark,
|
|
|
+ }
|
|
|
+ lock.Lock()
|
|
|
+ if spider := CodeInfoMap[spidercode]; spider != nil {
|
|
|
+ if failed == 0 { //未采集
|
|
|
+ spider.NoDownloadNum++
|
|
|
+ } else { //下载失败
|
|
|
+ spider.DownloadFailedNum++
|
|
|
+ if spider_err := spider.Error; spider_err != nil {
|
|
|
+ if errInfo := spider_err[errType]; errInfo != nil {
|
|
|
+ errInfo.Num++
|
|
|
+ if len(errInfo.Err) < 3 { //最多存放三个错误数据连接
|
|
|
+ errInfo.Err = append(errInfo.Err, oneErrInfo)
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ spider.Error[errType] = &ErrorInfo{
|
|
|
+ Num: 1,
|
|
|
+ Err: []*ErrRemark{
|
|
|
+ oneErrInfo,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ spider.Error = map[string]*ErrorInfo{
|
|
|
+ errType: &ErrorInfo{
|
|
|
+ Num: 1,
|
|
|
+ Err: []*ErrRemark{
|
|
|
+ oneErrInfo,
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ if n%100 == 0 {
|
|
|
+ logger.Debug(n)
|
|
|
+ }
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ logger.Debug("python下载异常数据统计完成...")
|
|
|
+}
|
|
|
+
|
|
|
+//根据爬虫监控信息创建任务流程
|
|
|
+func CreateTaskProcess() {
|
|
|
+ defer qu.Catch()
|
|
|
+ logger.Debug("开始生成爬虫任务...")
|
|
|
+ //arr := []map[string]interface{}{}
|
|
|
+ upsertBulk := [][]map[string]interface{}{} //任务更新集
|
|
|
+ arr := []map[string]interface{}{} //当天爬虫信息集
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ logger.Debug("CodeInfoMap:", len(CodeInfoMap))
|
|
|
+ for code, spider := range CodeInfoMap {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(code string, spider *Spider) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ //整理新任务的信息
|
|
|
+ task := &Task{
|
|
|
+ DescribeMap: map[int]string{},
|
|
|
+ }
|
|
|
+ //task.Platform = spider.Platform
|
|
|
+ //task.Site = spider.Site
|
|
|
+ //task.Code = spider.Code
|
|
|
+ //task.Channel = spider.Channel
|
|
|
+ //task.ModifyUser = spider.ModifyUser
|
|
|
+ //task.ModifyId = spider.ModifyId
|
|
|
+ //task.FrequencyErrTimes = spider.FrequencyErrTimes
|
|
|
+ //lua、python共有异常publishtime、text
|
|
|
+ if len(spider.Error) > 0 {
|
|
|
+ //1、download:下载异常errtype:5;
|
|
|
+ //2、regather:运行异常errtype:4;
|
|
|
+ //3、publishtime:时间异常errtype:3;
|
|
|
+ //4、text:数据异常errtype:2;
|
|
|
+ for stype, info := range LuaErrTypeInfoMap {
|
|
|
+ if err := spider.Error[stype]; err != nil {
|
|
|
+ //取最大的错误异常类型
|
|
|
+ if task.ErrType < info.ErrType {
|
|
|
+ task.ErrType = info.ErrType
|
|
|
+ }
|
|
|
+ //download、regather、publishtime、text错误中有一个类型错误个数大于10,任务状态即为待处理
|
|
|
+ if err.Num > 10 { //错误个数大于10为待处理
|
|
|
+ task.State = 1 //待处理
|
|
|
+ }
|
|
|
+ //错误描述
|
|
|
+ descript := info.Remark + ":共" + fmt.Sprint(err.Num) + "条\n"
|
|
|
+ for _, errRemark := range err.Err {
|
|
|
+ descript += errRemark.Remark + ":" + errRemark.Href + "\n"
|
|
|
+ }
|
|
|
+ task.DescribeMap[info.ErrType] = descript
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if spider.Platform == "golua平台" { //lua异常(由于采集频率异常比较特殊固放到最后处理)
|
|
|
+ if spider.ListNoDataTimes > 0 { //列表页无采集数据
|
|
|
+ //5、列表页异常 errtype:7
|
|
|
+ if !spider.ListIsFilter { //列表页不含过滤代码
|
|
|
+ task.State = 1 //待处理
|
|
|
+ task.ErrType = TASK_LISTERR
|
|
|
+ } else if len(task.DescribeMap) == 0 { //只有列表页异常且有过滤代码
|
|
|
+ task.State = 0 //待确认
|
|
|
+ task.ErrType = TASK_LISTERR
|
|
|
+ }
|
|
|
+ task.DescribeMap[TASK_LISTERR] = "列表页异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListNoDataTimes) + "轮无数据\n"
|
|
|
+ }
|
|
|
+ //6、采集频率异常 errtype:8
|
|
|
+ if spider.ListOhPercentTimes > 0 { //采集频率异常
|
|
|
+ UpdateLuaInfo(spider) //出现采集频率异常,便更新爬虫的frequencyerrtimes、最大页自动加1、重新上架
|
|
|
+ //只有当FrequencyErrTimes>3取采集频率异常,相反优先其他异常类型(采集频率异常且待确认时程序自动处理,人工几乎不介入)
|
|
|
+ if spider.FrequencyErrTimes > 3 { //爬虫采集频率异常次数大于3次,任务为待处理,否则为待确认
|
|
|
+ task.State = 1 //待处理
|
|
|
+ task.ErrType = TASK_RATEERR
|
|
|
+ } else if len(task.DescribeMap) == 0 { //只有采集频率异常且FrequencyErrTimes<=3
|
|
|
+ task.State = 0 //待确认
|
|
|
+ task.ErrType = TASK_RATEERR
|
|
|
+ }
|
|
|
+ task.DescribeMap[TASK_RATEERR] = "采集频率异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListOhPercentTimes) + "轮数据全采\n"
|
|
|
+ }
|
|
|
+ } else if spider.Platform == "python" { //python异常
|
|
|
+ for stype, info := range PythonErrTypeInfoMap {
|
|
|
+ if err := spider.Error[stype]; err != nil {
|
|
|
+ //取最大的错误异常类型
|
|
|
+ if task.ErrType < info.ErrType {
|
|
|
+ task.ErrType = info.ErrType
|
|
|
+ }
|
|
|
+ if info.ErrType > 3 { //python404异常、下载异常、运行异常任务状态均为待处理
|
|
|
+ task.State = 1
|
|
|
+ }
|
|
|
+ //错误描述
|
|
|
+ descript := info.Remark + ":共" + fmt.Sprint(err.Num) + "条\n"
|
|
|
+ for _, errRemark := range err.Err {
|
|
|
+ descript += errRemark.Remark + ":" + errRemark.Href + "\n"
|
|
|
+ }
|
|
|
+ //lua和python的info.ErrType:3、4可能同时存在,描述累加
|
|
|
+ task.DescribeMap[info.ErrType] = descript + task.DescribeMap[info.ErrType]
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ //存储爬虫统计信息
|
|
|
+ byteText, err := json.Marshal(spider)
|
|
|
+ if err != nil {
|
|
|
+ logger.Debug("Json Marshal Error", code)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
+ if json.Unmarshal(byteText, &tmp) == nil {
|
|
|
+ lock.Lock()
|
|
|
+ arr = append(arr, tmp)
|
|
|
+ lock.Unlock()
|
|
|
+ } else {
|
|
|
+ logger.Debug("Json UnMarshal Error", code)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //根据爬虫信息新建任务
|
|
|
+ CreateTask(task, spider, &upsertBulk, lock) //比对历史任务,新建任务
|
|
|
+ //
|
|
|
+ lock.Lock()
|
|
|
+ if len(arr) > 500 {
|
|
|
+ util.MgoE.SaveBulk("luacodeinfo", arr...)
|
|
|
+ arr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ if len(upsertBulk) > 500 {
|
|
|
+ util.MgoE.UpSertBulk("task", upsertBulk...)
|
|
|
+ upsertBulk = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(code, spider)
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(arr) > 0 {
|
|
|
+ util.MgoE.SaveBulk("luacodeinfo", arr...)
|
|
|
+ arr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ if len(upsertBulk) > 0 {
|
|
|
+ util.MgoE.UpSertBulk("task", upsertBulk...)
|
|
|
+ upsertBulk = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ logger.Debug("生成任务完成...")
|
|
|
+ CodeInfoMap = map[string]*Spider{}
|
|
|
+}
|
|
|
+
|
|
|
+//新任务与历史任务整合
|
|
|
+func CreateTask(t *Task, sp *Spider, upsertBulk *[][]map[string]interface{}, lock *sync.Mutex) {
|
|
|
+ defer qu.Catch()
|
|
|
+ if t.ErrType == 0 { //不是异常任务
|
|
|
+ return
|
|
|
+ }
|
|
|
+ diff := time.Now().Unix() - sp.AuditTime
|
|
|
+ if sp.State == 5 && diff <= 86400 { //已上架爬虫且爬虫最新一次提交审核时间小于24小时,不建任务
|
|
|
+ logger.Debug("该爬虫近期维护无需新建任务:", sp.Code)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ descript_new := "" //新任务的异常描述
|
|
|
+ for _, text := range t.DescribeMap {
|
|
|
+ descript_new += text
|
|
|
+ }
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "s_code": sp.Code,
|
|
|
+ "i_state": map[string]interface{}{
|
|
|
+ "$in": []int{0, 1, 2, 3, 5}, //查询现有正在维护的任务
|
|
|
+ },
|
|
|
+ }
|
|
|
+ fields := map[string]interface{}{
|
|
|
+ "i_state": 1,
|
|
|
+ "s_type": 1,
|
|
|
+ "s_descript": 1,
|
|
|
+ "i_times": 1,
|
|
|
+ "s_urgency": 1,
|
|
|
+ }
|
|
|
+ list, _ := util.MgoE.Find("task", query, nil, fields, false, -1, -1)
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ if list != nil && len(*list) > 0 { //已有任务
|
|
|
+ if len(*list) > 1 {
|
|
|
+ logger.Error("Code:", sp.Code, "任务异常")
|
|
|
+ util.MgoE.Save("luacreatetaskerr", map[string]interface{}{
|
|
|
+ "code": sp.Code,
|
|
|
+ "comeintime": time.Now().Unix(),
|
|
|
+ "tasknum": len(*list),
|
|
|
+ })
|
|
|
+ return
|
|
|
+ }
|
|
|
+ task := (*list)[0] //唯一任务
|
|
|
+ state_old := qu.IntAll(task["i_state"]) //历史任务状态
|
|
|
+ times_old := qu.IntAll(task["i_times"]) //历史任务待处理状态次数
|
|
|
+ type_old := qu.ObjToString(task["s_type"]) //历史任务异常类型
|
|
|
+ urgency_old := qu.ObjToString(task["s_urgency"]) //历史任务紧急度
|
|
|
+ descript_old := qu.ObjToString(task["s_descript"]) //历史任务描述
|
|
|
+ result := map[string]interface{}{
|
|
|
+ "i_frequencyerrtimes": sp.FrequencyErrTimes,
|
|
|
+ "i_num": sp.DownloadSuccessNum, //下载量(目前按下载成功量)
|
|
|
+ "l_updatetime": time.Now().Unix(),
|
|
|
+ "i_times": times_old + 1,
|
|
|
+ "s_descript": descript_old + time.Now().Format(qu.Date_Short_Layout) + "追加描述:------------------------------\n" + descript_new,
|
|
|
+ }
|
|
|
+ if state_old == 0 || state_old == 1 { //如果历史任务状态为待确认、待处理,更新任务信息,其它状态只追加任务描述、任务次数、下载量
|
|
|
+ //任务状态state、任务类型s_type
|
|
|
+ if state_old == 1 || t.State == 1 { //新任务、历史任务有一个任务状态为待处理,更新后任务状态为待处理
|
|
|
+ result["i_state"] = 1
|
|
|
+ if t.State == 1 && state_old == 1 { //新任务和历史任务均为待处理时,取异常类型等级高者
|
|
|
+ if t.ErrType > qu.IntAll(type_old) {
|
|
|
+ result["s_type"] = fmt.Sprint(t.ErrType)
|
|
|
+ }
|
|
|
+ } else if t.State == 1 { //新任务为待处理历史任务为待确认,取新任务的类型
|
|
|
+ result["s_type"] = fmt.Sprint(t.ErrType)
|
|
|
+ } /*else if state_old == 1 {
|
|
|
+ }*/
|
|
|
+ } else if state_old == 0 && t.State == 0 && t.ErrType > qu.IntAll(type_old) { //新任务、历史任务均为待确认,取异常类型等级高者
|
|
|
+ result["s_type"] = fmt.Sprint(t.ErrType)
|
|
|
+ }
|
|
|
+ if times_old >= 3 { //某爬虫第四次建任务时,任务状态变为待处理
|
|
|
+ result["i_state"] = 1
|
|
|
+ }
|
|
|
+ //任务紧急度urgency
|
|
|
+ urgency := qu.IntAll(urgency_old)
|
|
|
+ if urgency < 4 {
|
|
|
+ result["s_urgency"] = fmt.Sprint(urgency + 1)
|
|
|
+ }
|
|
|
+ //最迟完成时间
|
|
|
+ if qu.IntAll(result["i_state"]) == 1 && state_old == 0 { //新任务综合处理后任务状态为待处理,历史任务为待确认时,更新最迟完成时间
|
|
|
+ result["l_complete"] = util.CompleteTime(fmt.Sprint(urgency + 1))
|
|
|
+ }
|
|
|
+ }
|
|
|
+ update = append(update, map[string]interface{}{"_id": task["_id"]})
|
|
|
+ update = append(update, map[string]interface{}{"$set": result})
|
|
|
+ lock.Lock()
|
|
|
+ *upsertBulk = append(*upsertBulk, update)
|
|
|
+ lock.Unlock()
|
|
|
+ } else { //无历史任务
|
|
|
+ //times := 0
|
|
|
+ //if t.State == 1 { //待处理times=1
|
|
|
+ // times = 1
|
|
|
+ //}
|
|
|
+ saveMap := map[string]interface{}{
|
|
|
+ "s_modify": sp.ModifyUser,
|
|
|
+ "s_modifyid": sp.ModifyId,
|
|
|
+ "s_code": sp.Code,
|
|
|
+ "s_site": sp.Site,
|
|
|
+ "s_channel": sp.Channel,
|
|
|
+ "i_event": sp.Event,
|
|
|
+ "i_state": t.State,
|
|
|
+ "s_source": "程序",
|
|
|
+ "s_type": fmt.Sprint(t.ErrType),
|
|
|
+ "s_descript": descript_new,
|
|
|
+ "i_times": 1,
|
|
|
+ "i_num": sp.DownloadSuccessNum, //下载量(目前按下载成功量)
|
|
|
+ "l_comeintime": time.Now().Unix(),
|
|
|
+ //"l_updatetime": time.Now().Unix(),
|
|
|
+ "l_complete": util.CompleteTime("1"),
|
|
|
+ "s_urgency": "1",
|
|
|
+ "i_frequencyerrtimes": sp.FrequencyErrTimes,
|
|
|
+ }
|
|
|
+ update = append(update, query)
|
|
|
+ update = append(update, saveMap)
|
|
|
+ lock.Lock()
|
|
|
+ *upsertBulk = append(*upsertBulk, update)
|
|
|
+ lock.Unlock()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//更新爬虫最大页、爬虫上下架
|
|
|
+func UpdateLuaInfo(sp *Spider) {
|
|
|
+ defer qu.Catch()
|
|
|
+ //1、更新爬虫信息
|
|
|
+ set := map[string]interface{}{
|
|
|
+ "frequencyerrtimes": sp.FrequencyErrTimes, //更新次数
|
|
|
+ }
|
|
|
+ if sp.FrequencyErrTimes <= 3 {
|
|
|
+ set["param_common.5"] = sp.MaxPage + 1
|
|
|
+ }
|
|
|
+ logger.Debug("Code:", sp.Code, " ", sp.FrequencyErrTimes)
|
|
|
+ b := util.MgoE.Update("luaconfig", map[string]interface{}{"code": sp.Code}, map[string]interface{}{"$set": set}, false, false)
|
|
|
+ if b && sp.FrequencyErrTimes <= 3 { //FrequencyErrTimes>3时会建采集频率异常的待处理任务,不再上下架
|
|
|
+ //爬虫下架、上加
|
|
|
+ qu.Debug("爬虫上下架 code:", sp.Code)
|
|
|
+ ok, err := util.UpdateSpiderByCodeState(sp.Code, "6", sp.Event) //下架
|
|
|
+ if ok && err == nil {
|
|
|
+ logger.Debug(sp.Code, "下架成功")
|
|
|
+ time.Sleep(1 * time.Second)
|
|
|
+ ok, err = util.UpdateSpiderByCodeState(sp.Code, "5", sp.Event) //上架
|
|
|
+ if ok && err == nil {
|
|
|
+ logger.Debug(sp.Code, "上架成功")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//重置前一周内未下载成功的数据(一天3次未下成功的数据可以连续下一周)
|
|
|
+func ResetDataState() {
|
|
|
+ defer qu.Catch()
|
|
|
+ logger.Info("-----更新数据状态-----")
|
|
|
+ sess := util.MgoS.GetMgoConn()
|
|
|
+ defer util.MgoS.DestoryMongoConn(sess)
|
|
|
+ ch := make(chan bool, 3)
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "comeintime": map[string]interface{}{
|
|
|
+ "$gte": util.GetTime(-util.DayNum),
|
|
|
+ "$lt": util.GetTime(0),
|
|
|
+ },
|
|
|
+ "state": -1,
|
|
|
+ }
|
|
|
+ field := map[string]interface{}{
|
|
|
+ "_id": 1,
|
|
|
+ }
|
|
|
+ it := sess.DB("spider").C("spider_highlistdata").Find(&query).Select(&field).Iter()
|
|
|
+ count, _ := sess.DB("spider").C("spider_highlistdata").Find(&query).Count()
|
|
|
+ logger.Info("更新数据状态数量:", count)
|
|
|
+ n := 0
|
|
|
+ arr := [][]map[string]interface{}{}
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
|
|
|
+ ch <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(tmp map[string]interface{}) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ update := []map[string]interface{}{}
|
|
|
+ update = append(update, map[string]interface{}{"_id": tmp["_id"]})
|
|
|
+ update = append(update, map[string]interface{}{"$set": map[string]interface{}{"times": 0, "state": 0}})
|
|
|
+ lock.Lock()
|
|
|
+ arr = append(arr, update)
|
|
|
+ if len(arr) > 500 {
|
|
|
+ tmps := arr
|
|
|
+ util.MgoS.UpdateBulk("spider_highlistdata", tmps...)
|
|
|
+ arr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(tmp)
|
|
|
+ tmp = map[string]interface{}{}
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ lock.Lock()
|
|
|
+ if len(arr) > 0 {
|
|
|
+ util.MgoS.UpdateBulk("spider_highlistdata", arr...)
|
|
|
+ arr = [][]map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ logger.Info("-----更新数据状态完毕-----")
|
|
|
+}
|
|
|
+
|
|
|
+//关闭任务
|
|
|
+func CloseTask() {
|
|
|
+ qu.Catch()
|
|
|
+ logger.Debug("---清理未更新任务---")
|
|
|
+ decreaseDay, day := 0, 0
|
|
|
+ var cleanDay string
|
|
|
+ for {
|
|
|
+ decreaseDay--
|
|
|
+ weekDay := time.Now().AddDate(0, 0, decreaseDay).Weekday().String()
|
|
|
+ if weekDay != "Saturday" && weekDay != "Sunday" {
|
|
|
+ day++
|
|
|
+ }
|
|
|
+ if day == util.CloseNum {
|
|
|
+ cleanDay = time.Now().AddDate(0, 0, decreaseDay).Format("2006-01-02")
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+ the_time, _ := time.ParseInLocation(qu.Date_Short_Layout, cleanDay, time.Local)
|
|
|
+ unix_time := the_time.Unix() //凌晨时间戳
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "i_state": 0,
|
|
|
+ "l_complete": map[string]interface{}{
|
|
|
+ "$lt": unix_time + 86400,
|
|
|
+ },
|
|
|
+ "s_type": "1",
|
|
|
+ // "s_type": map[string]interface{}{
|
|
|
+ // "$ne": "7",
|
|
|
+ // },
|
|
|
+ }
|
|
|
+ logger.Debug("query:", query)
|
|
|
+ set := map[string]interface{}{
|
|
|
+ "$set": map[string]interface{}{
|
|
|
+ "i_state": 6,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ util.MgoE.Update("task", query, set, false, true)
|
|
|
+ logger.Debug("---清理未更新任务完毕---")
|
|
|
+}
|
|
|
+
|
|
|
+//保存爬虫每日监控信息
|
|
|
+func SaveCodeInfo() {
|
|
|
+ defer qu.Catch()
|
|
|
+ arr := []map[string]interface{}{}
|
|
|
+ wg := &sync.WaitGroup{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ ch := make(chan bool, 10)
|
|
|
+ logger.Debug("CodeInfoMap:", len(CodeInfoMap))
|
|
|
+ for code, spider := range CodeInfoMap {
|
|
|
+ wg.Add(1)
|
|
|
+ ch <- true
|
|
|
+ go func(code string, sp Spider) {
|
|
|
+ defer func() {
|
|
|
+ <-ch
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ byteText, err := json.Marshal(sp)
|
|
|
+ if err != nil {
|
|
|
+ logger.Debug("Json Marshal Error", code)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ tmp := map[string]interface{}{}
|
|
|
+ if json.Unmarshal(byteText, &tmp) == nil {
|
|
|
+ lock.Lock()
|
|
|
+ arr = append(arr, tmp)
|
|
|
+ lock.Unlock()
|
|
|
+ } else {
|
|
|
+ logger.Debug("Json UnMarshal Error", code)
|
|
|
+ return
|
|
|
+ }
|
|
|
+ lock.Lock()
|
|
|
+ if len(arr) > 500 {
|
|
|
+ util.MgoE.SaveBulk("luacodeinfo", arr...)
|
|
|
+ arr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ lock.Unlock()
|
|
|
+ }(code, *spider)
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ if len(arr) > 0 {
|
|
|
+ util.MgoE.SaveBulk("luacodeinfo", arr...)
|
|
|
+ arr = []map[string]interface{}{}
|
|
|
+ }
|
|
|
+ logger.Debug("爬虫基本信息生成完成...")
|
|
|
+}
|
|
|
+
|
|
|
+func SaveUserCreateTaskNum() {
|
|
|
+ defer qu.Catch()
|
|
|
+ for user, sn := range UserTaskNum {
|
|
|
+ save := map[string]interface{}{}
|
|
|
+ save["user"] = user
|
|
|
+ save["comeintime"] = time.Now().Unix()
|
|
|
+ for s, n := range sn {
|
|
|
+ save[s] = n
|
|
|
+ }
|
|
|
+ util.MgoE.Save("luausertask", save)
|
|
|
+ }
|
|
|
+ UserTaskNum = map[string]map[string]int{}
|
|
|
+}
|