瀏覽代碼

修改异常数据推送,新增挂起爬虫任务

maxiaoshan 2 年之前
父節點
當前提交
e8463208ce
共有 6 個文件被更改,包括 157 次插入27 次删除
  1. 1 1
      src/config.json
  2. 8 8
      src/luatask/othertask.go
  3. 2 1
      src/luatask/task.go
  4. 2 2
      src/main.go
  5. 143 14
      src/timetask/random.go
  6. 1 1
      src/timetask/wxworkwarn.go

+ 1 - 1
src/config.json

@@ -127,7 +127,7 @@
 	"resetdatastatecron": "0 0 2 ? * MON-FRI",
     "startaskcron": "0 0 8 ? * MON-FRI",
 	"codesummarycron": "0 30 8 ? * *",
-	"randomdatapushcron": "0 50 8 ? * MON-FRI",
+	"randomdatapushcron": "0 50 8 ? * *",
 	"qyworkremindmodifyusercron": "0 0 9 ? * MON-FRI",
 	"qyworkremindauditorcron": "0 30 17 ? * MON-FRI",
 	"filewarncron": "0 55 8 ? * *",

+ 8 - 8
src/luatask/othertask.go

@@ -15,8 +15,7 @@ func CreateTaskByCodePendstate() {
 		"platform":  "golua平台",
 		"pendstate": 1,
 		"pendtime": map[string]interface{}{
-			"$lt":  util.GetTime(-14), //爬虫挂起超过14天的,创建任务
-			"$gte": 1661961600,
+			"$lt": util.GetTime(-30), //爬虫挂起超过30天的,创建任务
 		},
 	}
 	fields := map[string]interface{}{
@@ -49,12 +48,12 @@ func CreateTaskByCodePendstate() {
 			//区分处理中任务是因为:处理中的任务由审核人员核实而来有l_checktime字段(周报统计),做任务的更新操作;待确认、待处理的任务由程序分发,暂时不加l_checktime属性
 			descript := qu.ObjToString((*task)["s_descript"])
 			q["_id"] = (*task)["_id"]
-			set["i_state"] = 2
+			set["i_state"] = 1
 			set["l_updatetime"] = time.Now().Unix()
-			set["i_pendstate"] = 1
-			set["s_descript"] = descript + qu.FormatDate(&today, qu.Date_Short_Layout) + "追加描述:------------------------------\n该爬虫已连续挂起超7天(" + pendtimeStr + ")\n"
+			set["i_pendstate"] = 0
+			set["s_descript"] = descript + qu.FormatDate(&today, qu.Date_Short_Layout) + "追加描述:------------------------------\n该爬虫已连续挂起超30天(" + pendtimeStr + ")\n"
 		} else { //无历史任务
-			set["s_descript"] = "该爬虫已连续挂起超7天(" + pendtimeStr + ")\n"
+			set["s_descript"] = "该爬虫已连续挂起超30天(" + pendtimeStr + ")\n"
 			set["s_platform"] = "golua平台"
 			set["s_channel"] = tmp["channel"]
 			set["i_event"] = tmp["event"]
@@ -67,12 +66,13 @@ func CreateTaskByCodePendstate() {
 			set["s_modify"] = tmp["modifyuser"]
 			set["s_modifyid"] = tmp["modifyuserid"]
 			set["s_site"] = tmp["site"]
-			set["i_state"] = 2
-			set["i_pendstate"] = 1
+			set["i_state"] = 1
+			set["i_pendstate"] = 0
 			set["s_code"] = code
 			set["s_source"] = "程序"
 			set["i_frequencyerrtimes"] = 0
 		}
 		util.MgoEB.Update("task", q, map[string]interface{}{"$set": set}, true, false)
+		util.MgoEB.UpdateById("luaconfig", tmp["_id"], map[string]interface{}{"$set": map[string]interface{}{"pendstate": 0, "pendtime": 0}})
 	}
 }

+ 2 - 1
src/luatask/task.go

@@ -192,6 +192,7 @@ func StartTask() {
 	//CloseTask()      //关闭任务
 	SendInfoToWxWork_SiteDataCount()
 	SendLuaPythonAllNum()
+	CreateTaskByCodePendstate() //挂机爬虫任务
 }
 
 //初始化
@@ -1634,7 +1635,7 @@ func CreateTaskProcess() {
 			}
 			if spider.Platform == "golua平台" { //lua异常(由于采集频率异常比较特殊固放到最后处理)
 				//5、列表页异常	errtype:7
-				if spider.ListNoDataTimes > 0 && spider.ListNoDataTimes == spider.ListDownloadAllTimes {
+				if spider.ListDownloadAllTimes == 0 || (spider.ListNoDataTimes > 0 && spider.ListNoDataTimes == spider.ListDownloadAllTimes) { //spider.ListDownloadAllTimes == 0针对异常不运行爬虫做判断
 					limitDayNum := 0
 					if spider.Event == 7520 { //由于7520节点爬虫循环一轮的时间较长,心跳有可能仍是前一天的
 						limitDayNum = -1

+ 2 - 2
src/main.go

@@ -41,8 +41,8 @@ func main() {
 	c := cron.New()
 	c.Start()
 	//定时任务
-	c.AddFunc(util.RandomDataPushCron, timetask.GetSpiderWarnData) //数据维护平台-爬虫数据维护数据数据统计
-	c.AddFunc(util.LuamoveCron, timetask.LuaMoveEvent)             //数据维护平台-爬虫数据维护数据数据统计
+	c.AddFunc(util.RandomDataPushCron, timetask.PushSpiderWarnErrData) //数据维护平台-爬虫数据维护数据数据统计
+	c.AddFunc(util.LuamoveCron, timetask.LuaMoveEvent)                 //数据维护平台-爬虫数据维护数据数据统计
 	//
 	c.AddFunc(util.FileWarnCron, timetask.GetFileWarn) //异常附件数据警告
 	//

+ 143 - 14
src/timetask/random.go

@@ -1,6 +1,7 @@
 package timetask
 
 import (
+	"encoding/json"
 	qu "qfw/util"
 	"strings"
 	"sync"
@@ -18,6 +19,127 @@ type WarnInfo struct {
 	Infos    map[string]bool
 	Code     interface{}
 	Href     interface{}
+	Repeat   bool
+}
+
+var StypeArr = []string{
+	"Field Value Is Null",
+	"Field Value Contains Random Code",
+	"Field Value Not Contains Chinese",
+	"Detail File Err",
+}
+
+func PushSpiderWarnErrData() {
+	GetSpiderWarnData()
+	GetHighlistDetailFilErrData()
+}
+
+func GetHighlistDetailFilErrData() {
+	defer qu.Catch()
+	sess := util.MgoS.GetMgoConn()
+	defer util.MgoS.DestoryMongoConn(sess)
+	stime := util.GetTime(-7)
+	etime := util.GetTime(0)
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": stime,
+			"$lt":  etime,
+		},
+		"detailfilerr": true,
+		"state":        -1,
+	}
+	fields := map[string]interface{}{
+		"site":        1,
+		"channel":     1,
+		"spidercode":  1,
+		"area":        1,
+		"city":        1,
+		"district":    1,
+		"jsondata":    1,
+		"publishtime": 1,
+		"comeintime":  1,
+		"href":        1,
+		"title":       1,
+		"dataging":    1,
+		"_id":         0,
+	}
+	ch := make(chan bool, 2)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	arr := []map[string]interface{}{}
+	it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			result := map[string]interface{}{}
+			result["from"] = "list"
+			result["level"] = 2
+			result["info"] = "Detail File Err"
+			result["ok"] = false
+			result["field"] = "detail"
+			result["site"] = tmp["site"]
+			result["channel"] = tmp["channel"]
+			result["title"] = tmp["title"]
+			result["href"] = tmp["href"]
+			result["spidercode"] = tmp["spidercode"]
+			result["comeintime"] = time.Now().Unix()
+			//publishtime
+			publishtime_str := qu.ObjToString(tmp["publishtime"])
+			publishtime_int := int64(0)
+			if publishtime_str != "0" {
+				if t, err := time.ParseInLocation(qu.Date_Full_Layout, publishtime_str, time.Local); err == nil {
+					publishtime_int = t.Unix()
+				}
+			}
+			result["repeat"] = RepeatData(qu.ObjToString(tmp["title"]), publishtime_int)
+			//jsondata
+			if jsondata := qu.ObjToString(tmp["jsondata"]); jsondata != "" {
+				jsondataMap := map[string]interface{}{}
+				if json.Unmarshal([]byte(jsondata), &jsondataMap) == nil {
+					tmp["jsondata"] = jsondataMap
+				} else {
+					delete(tmp, "jsondata")
+				}
+			}
+			iscompete := false
+			coll := "bidding"
+			lua, _ := util.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": tmp["spidercode"]})
+			if len(*lua) > 0 {
+				iscompete, _ = (*lua)["spidercompete"].(bool)
+				param_common := (*lua)["param_common"].([]interface{})
+				if len(param_common) >= 8 {
+					coll = qu.ObjToString(param_common[7])
+				}
+			}
+			tmp["iscompete"] = iscompete
+			tmp["publishtime"] = publishtime_int
+			tmp["_d"] = "comeintime"
+			tmp["T"] = coll
+			result["data"] = tmp
+			lock.Lock()
+			arr = append(arr, result)
+			if len(arr) > 500 {
+				util.MgoS.SaveBulk("spider_warn_err", arr...)
+				arr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%100 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		util.MgoS.SaveBulk("spider_warn_err", arr...)
+		arr = []map[string]interface{}{}
+	}
 }
 
 func GetSpiderWarnData() {
@@ -35,9 +157,10 @@ func GetSpiderWarnData() {
 			"$gte": stime,
 			"$lt":  etime,
 		},
-		"info": map[string]interface{}{
-			"$in": []string{"Html Contains Temp Language", "Field Value Contains Random Code", "Publishtime Is Too Early", "Publishtime Is Too Late", "Field Value Not Contains Chinese"},
+		"info": map[string]interface{}{ //保存服务更新后这个条件可去掉2022-11-28
+			"$in": StypeArr,
 		},
+		"level": 2,
 	}
 	ch := make(chan bool, 2)
 	wg := &sync.WaitGroup{}
@@ -58,24 +181,28 @@ func GetSpiderWarnData() {
 			field := qu.ObjToString(tmp["field"])
 			info := qu.ObjToString(tmp["info"])
 			title := qu.ObjToString(tmp["title"])
-			if info == "Field Value Not Contains Chinese" && RepeatData(title) > 0 { //数据验证,数据库已有title一致的数据不再推送
-				return
-			}
-			if field == "publishtime" { //特殊处理publishtime字段的level(保存服务中publishtime异常数据入bidding库,level不能为2)
-				level = 1
+			publishtime := int64(0)
+			data, ok := tmp["data"].(map[string]interface{})
+			if ok {
+				if ptime := data["publishtime"]; ptime != nil {
+					publishtime = qu.Int64All(ptime)
+				}
 			}
+			//数据验证,是否有title一致,相似publishtime的数据,视为一样的数据,不需要再修复
+			repeat := RepeatData(title, publishtime)
 			lock.Lock()
 			if warnInfo := result[href]; warnInfo == nil {
 				warnInfo = &WarnInfo{
 					Fields:   map[string]bool{field: true},
 					MaxLevel: level,
-					Data:     tmp["data"],
+					Data:     data,
 					Site:     tmp["site"],
 					Channel:  tmp["channel"],
 					Title:    title,
 					Infos:    map[string]bool{info: true},
 					Code:     tmp["code"],
 					Href:     href,
+					Repeat:   repeat,
 				}
 				result[href] = warnInfo
 			} else {
@@ -117,12 +244,14 @@ func GetSpiderWarnData() {
 				"site":       w.Site,
 				"channel":    w.Channel,
 				"title":      w.Title,
+				"repeat":     w.Repeat,
 				"comeintime": time.Now().Unix(),
 				"info":       strings.Join(infos, ","),
-				"code":       w.Code,
+				"spidercode": w.Code,
 				"href":       w.Href,
 				"data":       w.Data,
 				"ok":         false,
+				"from":       "warn",
 			})
 			if len(saveArr) > 500 {
 				util.MgoS.SaveBulk("spider_warn_err", saveArr...)
@@ -138,15 +267,15 @@ func GetSpiderWarnData() {
 	}
 }
 
-func RepeatData(title string) int {
+func RepeatData(title string, publishtime int64) bool {
 	return util.MgoB.Count("bidding",
 		map[string]interface{}{
 			"title": title,
-			"comeintime": map[string]interface{}{
-				"$gte": util.GetTime(-3),
-				"$lte": time.Now().Unix(),
+			"publishtime": map[string]interface{}{
+				"$gte": publishtime + 86400*3,
+				"$lte": publishtime - 86400*3,
 			},
-		})
+		}) > 0
 }
 
 /*

+ 1 - 1
src/timetask/wxworkwarn.go

@@ -145,7 +145,7 @@ func SendLuaInfo() {
 			"$or": []interface{}{
 				map[string]interface{}{
 					"event": map[string]interface{}{
-						"$ne": 7520,
+						"$nin": []int{7000, 7520},
 						//"$nin": []int{7500, 7510},
 					},
 					"list": map[string]interface{}{