Browse Source

列表页异常任务逻辑

maxiaoshan 3 years ago
parent
commit
3821164df8
1 changed files with 159 additions and 94 deletions
  1. 159 94
      src/luatask/task.go

+ 159 - 94
src/luatask/task.go

@@ -4,7 +4,6 @@ import (
 	"encoding/json"
 	"fmt"
 	qu "qfw/util"
-	"strings"
 	"sync"
 	"time"
 	"util"
@@ -92,6 +91,10 @@ type Spider struct {
 	ListOhPercentTimes   int                   `json:"listohpercenttimes"`   //列表页采集百分百次数
 	ListNoDataTimes      int                   `json:"listnodatatimes"`      //一天内列表页下载无数据次数
 	Comeintime           int64                 `json:"comeintime"`           //入库时间
+	ListHeart            int64                 `json:"listheart"`            //列表页执行心跳
+	DetailHeart          int64                 `json:"detailheart"`          //详情页执行心跳
+	FindListHeart        int64                 `json:"findlistheart"`        //列表页获得数据量心跳
+	DetailExecuteHeart   int64                 `json:"detailexecuteheart"`   //详情页下载成功心跳
 	Error                map[string]*ErrorInfo `json:"error"`
 	//OhPercentTimes    int                   `json:"ohpercentimes"`     //采集量占总下载量100%的次数
 	//NtPercentTime     int                   `json:"ntpercentimes"`     //采集量占总下载量90%-100%的次数
@@ -147,6 +150,7 @@ func StartTask() {
 	InitInfo() //初始化时间
 	logger.Debug(StartTime, EndTime)
 	GetCodeBaseInfo()              //初始化爬虫基本信息
+	GetCodeHeart()                 //初始化爬虫心跳信息
 	GetSpiderHighListDownloadNum() //统计spider_highlistdata爬虫列表页下载量、下载失败量、未下载量
 	GetSpiderListDownloadNum()     //统计spider_listdata爬虫列表页下载量、下载失败量、未下载量
 	GetSpiderDownloadRateDataNew()
@@ -269,6 +273,57 @@ func GetCodeBaseInfo() {
 	logger.Debug("爬虫基本信息准备完成...", len(CodeInfoMap))
 }
 
+// GetCodeHeart 获取爬虫的心跳信息
+func GetCodeHeart() {
+	defer qu.Catch()
+	sess := util.MgoS.GetMgoConn()
+	defer util.MgoS.DestoryMongoConn(sess)
+	query := map[string]interface{}{
+		"del": false,
+	}
+	fields := map[string]interface{}{
+		"code":          1,
+		"list":          1,
+		"detail":        1,
+		"findlist":      1,
+		"detailexecute": 1,
+	}
+	lock := &sync.Mutex{}
+	wg := &sync.WaitGroup{}
+	ch := make(chan bool, 5)
+	it := sess.DB(util.MgoS.DbName).C("spider_heart").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		wg.Add(1)
+		ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			code := qu.ObjToString(tmp["code"])
+			listHeart := qu.Int64All(tmp["list"])
+			detailHeart := qu.Int64All(tmp["detail"])
+			findListHeart := qu.Int64All(tmp["findlist"])
+			detailExecuteHeart := qu.Int64All(tmp["detailexecute"])
+			lock.Lock()
+			if sp := CodeInfoMap[code]; sp != nil {
+				sp.ListHeart = listHeart
+				sp.DetailHeart = detailHeart
+				sp.FindListHeart = findListHeart
+				sp.DetailExecuteHeart = detailExecuteHeart
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%100 == 0 {
+			logger.Debug(n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	logger.Debug("统计采集量spider_heart完成...")
+}
+
 // GetSpiderHighListDownloadNum 统计爬虫列表页下载量和下载失败量
 func GetSpiderHighListDownloadNum() {
 	defer qu.Catch()
@@ -699,7 +754,9 @@ func GetSpiderDownloadRateDataNew() {
 			if spider := CodeInfoMap[code]; spider != nil {
 				spider.ListDownloadAllTimes = alltimes
 				spider.ListNoDataTimes = zero
-				if oh_percent > 0 && util.CodeEventModel[spider.Event] != 0 { //含有100%采集,及为采集频率异常(由于7410、7500、7700为老模式的队列模式,不建采集频率异常任务)
+				//含有100%采集,及为采集频率异常(由于7410、7500、7510、7700队列模式节点,不建采集频率异常任务)
+				//上轮数据下载不成功,下轮采集会被任务是新数据(应该建下载异常任务)
+				if oh_percent > 0 && spider.Model != 0 {
 					spider.FrequencyErrTimes++
 					spider.ListOhPercentTimes = oh_percent
 				}
@@ -849,96 +906,96 @@ func GetSpiderWarnErrData() {
 		tmp = map[string]interface{}{}
 	}
 	//2、统计regatherdata
-	match = map[string]interface{}{
-		"state": map[string]interface{}{
-			"$lte": 1,
-		},
-		"from": "lua",
-		"comeintime": map[string]interface{}{
-			"$gte": StartTime,
-			"$lt":  EndTime,
-		},
-	}
-	group1 = map[string]interface{}{
-		"_id": "$spidercode",
-		"count": map[string]interface{}{
-			"$sum": 1,
-		},
-	}
-	p = []map[string]interface{}{
-		map[string]interface{}{"$match": match},
-		map[string]interface{}{"$group": group1},
-	}
-	logger.Debug("regather query:", match)
-	it2 := sess.DB(util.MgoS.DbName).C("regatherdata").Pipe(p).Iter()
-	n2 := 0
-	for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ {
-		wg.Add(1)
-		ch <- true
-		go func(tmp map[string]interface{}) {
-			defer func() {
-				<-ch
-				wg.Done()
-			}()
-			code := qu.ObjToString(tmp["_id"]) //爬虫代码
-			count := qu.IntAll(tmp["count"])   //异常数据量
-			query := map[string]interface{}{
-				"state": map[string]interface{}{
-					"$lte": 1,
-				},
-				"from": "lua",
-				"comeintime": map[string]interface{}{
-					"$gte": StartTime,
-					"$lt":  EndTime,
-				},
-				"spidercode": code,
-			}
-			//logger.Debug("query:", query)
-
-			errArr := []*ErrRemark{}
-			list, _ := util.MgoS.Find("regatherdata", query, nil, map[string]interface{}{"href": 1, "error": 1}, false, 0, 3)
-			for _, l := range *list {
-				errText := qu.ObjToString(l["error"])
-				errText = strings.Replace(errText, "<string>:", "", 1)
-				errArr = append(errArr, &ErrRemark{
-					Href:   qu.ObjToString(l["href"]),
-					Remark: errText,
-				})
-			}
-			//one, _ := util.MgoS.FindOne("regatherdata", query) //查询该错误信息类型的一条href
-			//oneErrInfo := &ErrRemark{
-			//	Href:   qu.ObjToString((*one)["href"]),
-			//	Remark: qu.ObjToString((*one)["error"]),
-			//}
-			if spider := CodeInfoMap[code]; spider != nil {
-				spider.Error["regather"] = &ErrorInfo{
-					Num: count,
-					Err: errArr,
-				}
-				// if spider_err := spider.Error; spider_err != nil {
-				// 	spider_err["regather"] = &ErrorInfo{
-				// 		Num: count,
-				// 		Err: []map[string]interface{}{
-				// 			oneErrInfo,
-				// 		},
-				// 	}
-				// } else {
-				// 	spider.Error = map[string]*ErrorInfo{
-				// 		"regather": &ErrorInfo{
-				// 			Num: count,
-				// 			Err: []map[string]interface{}{
-				// 				oneErrInfo,
-				// 			},
-				// 		},
-				// 	}
-				// }
-			}
-		}(tmp)
-		if n2%10 == 0 {
-			logger.Debug(n2)
-		}
-		tmp = map[string]interface{}{}
-	}
+	//match = map[string]interface{}{
+	//	"state": map[string]interface{}{
+	//		"$lte": 1,
+	//	},
+	//	"from": "lua",
+	//	"comeintime": map[string]interface{}{
+	//		"$gte": StartTime,
+	//		"$lt":  EndTime,
+	//	},
+	//}
+	//group1 = map[string]interface{}{
+	//	"_id": "$spidercode",
+	//	"count": map[string]interface{}{
+	//		"$sum": 1,
+	//	},
+	//}
+	//p = []map[string]interface{}{
+	//	map[string]interface{}{"$match": match},
+	//	map[string]interface{}{"$group": group1},
+	//}
+	//logger.Debug("regather query:", match)
+	//it2 := sess.DB(util.MgoS.DbName).C("regatherdata").Pipe(p).Iter()
+	//n2 := 0
+	//for tmp := make(map[string]interface{}); it2.Next(&tmp); n2++ {
+	//	wg.Add(1)
+	//	ch <- true
+	//	go func(tmp map[string]interface{}) {
+	//		defer func() {
+	//			<-ch
+	//			wg.Done()
+	//		}()
+	//		code := qu.ObjToString(tmp["_id"]) //爬虫代码
+	//		count := qu.IntAll(tmp["count"])   //异常数据量
+	//		query := map[string]interface{}{
+	//			"state": map[string]interface{}{
+	//				"$lte": 1,
+	//			},
+	//			"from": "lua",
+	//			"comeintime": map[string]interface{}{
+	//				"$gte": StartTime,
+	//				"$lt":  EndTime,
+	//			},
+	//			"spidercode": code,
+	//		}
+	//		//logger.Debug("query:", query)
+	//
+	//		errArr := []*ErrRemark{}
+	//		list, _ := util.MgoS.Find("regatherdata", query, nil, map[string]interface{}{"href": 1, "error": 1}, false, 0, 3)
+	//		for _, l := range *list {
+	//			errText := qu.ObjToString(l["error"])
+	//			errText = strings.Replace(errText, "<string>:", "", 1)
+	//			errArr = append(errArr, &ErrRemark{
+	//				Href:   qu.ObjToString(l["href"]),
+	//				Remark: errText,
+	//			})
+	//		}
+	//		//one, _ := util.MgoS.FindOne("regatherdata", query) //查询该错误信息类型的一条href
+	//		//oneErrInfo := &ErrRemark{
+	//		//	Href:   qu.ObjToString((*one)["href"]),
+	//		//	Remark: qu.ObjToString((*one)["error"]),
+	//		//}
+	//		if spider := CodeInfoMap[code]; spider != nil {
+	//			spider.Error["regather"] = &ErrorInfo{
+	//				Num: count,
+	//				Err: errArr,
+	//			}
+	//			// if spider_err := spider.Error; spider_err != nil {
+	//			// 	spider_err["regather"] = &ErrorInfo{
+	//			// 		Num: count,
+	//			// 		Err: []map[string]interface{}{
+	//			// 			oneErrInfo,
+	//			// 		},
+	//			// 	}
+	//			// } else {
+	//			// 	spider.Error = map[string]*ErrorInfo{
+	//			// 		"regather": &ErrorInfo{
+	//			// 			Num: count,
+	//			// 			Err: []map[string]interface{}{
+	//			// 				oneErrInfo,
+	//			// 			},
+	//			// 		},
+	//			// 	}
+	//			// }
+	//		}
+	//	}(tmp)
+	//	if n2%10 == 0 {
+	//		logger.Debug(n2)
+	//	}
+	//	tmp = map[string]interface{}{}
+	//}
 	wg.Wait()
 	logger.Debug("错误信息数据统计完成...")
 }
@@ -1202,8 +1259,8 @@ func CreateTaskProcess() {
 				}
 			}
 			if spider.Platform == "golua平台" { //lua异常(由于采集频率异常比较特殊固放到最后处理)
-				if spider.ListNoDataTimes > 0 && spider.ListNoDataTimes == spider.ListDownloadAllTimes { //列表页无采集数据
-					//5、列表页异常	errtype:7
+				//5、列表页异常	errtype:7
+				if spider.ListNoDataTimes > 0 && spider.ListNoDataTimes == spider.ListDownloadAllTimes && spider.FindListHeart < util.GetTime(0) { //列表页采集量有心跳不建列表页异常任务
 					if !spider.ListIsFilter { //列表页不含过滤代码
 						task.State = 1 //待处理
 						task.ErrType = TASK_LISTERR
@@ -1297,6 +1354,14 @@ func CreateTask(t *Task, sp *Spider, upsertBulk *[][]map[string]interface{}, loc
 	if t.ErrType == 0 { //不是异常任务
 		return
 	}
+	if sp.PendState == 1 {
+		if sp.DownloadAllNum == 0 { //挂起状态爬虫,且下载量为0,不建任务
+			return
+		} else { //挂起状态有下载量,更新爬虫挂起状态
+			sp.PendState = 0 //影响任务i_pendstate状态
+			util.MgoE.Update("luaconfig", map[string]interface{}{"code": sp.Code}, map[string]interface{}{"$set": map[string]interface{}{"pendstate": 0}}, false, false)
+		}
+	}
 	diff := time.Now().Unix() - sp.AuditTime
 	if sp.State == 5 && diff <= 86400 { //已上架爬虫且爬虫最新一次提交审核时间小于24小时,不建任务
 		logger.Debug("该爬虫近期维护无需新建任务:", sp.Code)