Ver Fonte

异常数据推送修改

maxiaoshan há 2 anos atrás
pai
commit
8b641532bb
2 ficheiros alterados com 159 adições e 23 exclusões
  1. 7 1
      src/luatask/newtask.go
  2. 152 22
      src/timetask/random.go

+ 7 - 1
src/luatask/newtask.go

@@ -538,7 +538,7 @@ func saveCodeInfo() {
 				wg.Done()
 			}()
 			getAllErr(sp)                  //汇总异常
-			createTask(sp, &taskArr, lock) //
+			createTask(sp, &taskArr, lock) //创建任务
 			sp.Comeintime = comeintime
 			spByte, err := bson.Marshal(sp)
 			if err != nil {
@@ -920,3 +920,9 @@ func (sp *NewSpider) getErrHrefs(coll, errType string, query map[string]interfac
 	}
 	return
 }
+
+/*
+	1、列表页统计的是当天心跳,提前告警。如果当天心跳有问题呢?
+	2、下载异常由于原网站详情页无信息造成的,如何提高任务准确率?
+	3、7410变链接造成的采集频率异常如何解决?
+*/

+ 152 - 22
src/timetask/random.go

@@ -36,13 +36,16 @@ var WarnStypeMap = map[string]int{
 	"Attachment Upload Failed": 1,
 }
 var httpReg = regexp.MustCompile(`^(http|https).*`)
+var invalidDate int64
 
 func PushSpiderWarnErrData() {
+	invalidDate = time.Now().AddDate(-2, 0, 0).Unix()
 	GetSpiderWarnData()
-	GetHighlistDetailFilErrData()
+	GetHighlistDataDownloadFailedData()
+	GetListDataDownloadFailedData()
 }
 
-func GetHighlistDetailFilErrData() {
+func GetHighlistDataDownloadFailedData() {
 	defer qu.Catch()
 	sess := util.MgoS.GetMgoConn()
 	defer util.MgoS.DestoryMongoConn(sess)
@@ -53,8 +56,121 @@ func GetHighlistDetailFilErrData() {
 			"$gte": stime,
 			"$lt":  etime,
 		},
-		"detailfilerr": true,
-		"state":        -1,
+		"state": -1,
+	}
+	fields := map[string]interface{}{
+		"site":         1,
+		"channel":      1,
+		"spidercode":   1,
+		"area":         1,
+		"city":         1,
+		"district":     1,
+		"jsondata":     1,
+		"publishtime":  1,
+		"comeintime":   1,
+		"href":         1,
+		"title":        1,
+		"dataging":     1,
+		"detailfilerr": 1,
+		"_id":          0,
+	}
+	ch := make(chan bool, 2)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	arr := []map[string]interface{}{}
+	it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			//publishtime
+			ok, publishtime := FilterDataByPublishtime(tmp)
+			if ok {
+				return
+			}
+			result := map[string]interface{}{}
+			result["info"] = "Download Failed"
+			if tmp["detailfilerr"] != nil {
+				delete(tmp, "detailfilerr")
+				result["info"] = "Detail File Err"
+			}
+			result["from"] = "list"
+			result["level"] = 2
+			result["ok"] = false
+			result["field"] = "detail"
+			result["site"] = tmp["site"]
+			result["channel"] = tmp["channel"]
+			result["title"] = tmp["title"]
+			result["href"] = tmp["href"]
+			result["spidercode"] = tmp["spidercode"]
+			result["comeintime"] = time.Now().Unix()
+			result["entry"] = false
+			result["repeat"] = RepeatData(qu.ObjToString(tmp["title"]), publishtime)
+			//jsondata
+			if jsondata := qu.ObjToString(tmp["jsondata"]); jsondata != "" {
+				jsondataMap := map[string]interface{}{}
+				if json.Unmarshal([]byte(jsondata), &jsondataMap) == nil {
+					tmp["jsondata"] = jsondataMap
+				} else {
+					delete(tmp, "jsondata")
+				}
+			}
+			iscompete := false
+			coll := "bidding"
+			lua, _ := util.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": tmp["spidercode"]})
+			if len(*lua) > 0 {
+				iscompete, _ = (*lua)["spidercompete"].(bool)
+				param_common := (*lua)["param_common"].([]interface{})
+				if len(param_common) >= 8 {
+					coll = qu.ObjToString(param_common[7])
+				}
+			}
+			tmp["iscompete"] = iscompete
+			tmp["publishtime"] = publishtime
+			tmp["_d"] = "comeintime"
+			tmp["T"] = coll
+			tmp["detail"] = "详情请访问原网页!"
+			tmp["contenthtml"] = "详情请访问原网页!"
+			result["data"] = tmp
+			lock.Lock()
+			arr = append(arr, result)
+			if len(arr) > 500 {
+				util.MgoS.SaveBulk("spider_warn_err", arr...)
+				arr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%100 == 0 {
+			qu.Debug("current:", n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	if len(arr) > 0 {
+		util.MgoS.SaveBulk("spider_warn_err", arr...)
+		arr = []map[string]interface{}{}
+	}
+	qu.Debug("spider_highlistdata下载失败数据已统计推送spider_warn_err")
+}
+
+func GetListDataDownloadFailedData() {
+	defer qu.Catch()
+	sess := util.MgoS.GetMgoConn()
+	defer util.MgoS.DestoryMongoConn(sess)
+	stime := util.GetTime(-7)
+	etime := util.GetTime(-6)
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": stime,
+			"$lt":  etime,
+		},
+		"state": -1,
+		"count": 0,
 	}
 	fields := map[string]interface{}{
 		"site":        1,
@@ -75,7 +191,7 @@ func GetHighlistDetailFilErrData() {
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{}
 	arr := []map[string]interface{}{}
-	it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fields).Iter()
+	it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fields).Iter()
 	n := 0
 	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
 		ch <- true
@@ -85,10 +201,15 @@ func GetHighlistDetailFilErrData() {
 				<-ch
 				wg.Done()
 			}()
+			//publishtime
+			ok, publishtime := FilterDataByPublishtime(tmp)
+			if ok {
+				return
+			}
 			result := map[string]interface{}{}
+			result["info"] = "Download Failed"
 			result["from"] = "list"
 			result["level"] = 2
-			result["info"] = "Detail File Err"
 			result["ok"] = false
 			result["field"] = "detail"
 			result["site"] = tmp["site"]
@@ -98,15 +219,7 @@ func GetHighlistDetailFilErrData() {
 			result["spidercode"] = tmp["spidercode"]
 			result["comeintime"] = time.Now().Unix()
 			result["entry"] = false
-			//publishtime
-			publishtime_str := qu.ObjToString(tmp["publishtime"])
-			publishtime_int := int64(0)
-			if publishtime_str != "0" {
-				if t, err := time.ParseInLocation(qu.Date_Full_Layout, publishtime_str, time.Local); err == nil {
-					publishtime_int = t.Unix()
-				}
-			}
-			result["repeat"] = RepeatData(qu.ObjToString(tmp["title"]), publishtime_int)
+			result["repeat"] = RepeatData(qu.ObjToString(tmp["title"]), publishtime)
 			//jsondata
 			if jsondata := qu.ObjToString(tmp["jsondata"]); jsondata != "" {
 				jsondataMap := map[string]interface{}{}
@@ -127,7 +240,7 @@ func GetHighlistDetailFilErrData() {
 				}
 			}
 			tmp["iscompete"] = iscompete
-			tmp["publishtime"] = publishtime_int
+			tmp["publishtime"] = publishtime
 			tmp["_d"] = "comeintime"
 			tmp["T"] = coll
 			tmp["detail"] = "详情请访问原网页!"
@@ -151,6 +264,7 @@ func GetHighlistDetailFilErrData() {
 		util.MgoS.SaveBulk("spider_warn_err", arr...)
 		arr = []map[string]interface{}{}
 	}
+	qu.Debug("spider_listdata下载失败数据已统计推送spider_warn_err")
 }
 
 func GetSpiderWarnData() {
@@ -170,7 +284,7 @@ func GetSpiderWarnData() {
 		//},
 		//"level": 2,
 	}
-	invalidDate := time.Now().AddDate(-2, 0, 0).Unix()
+
 	ch := make(chan bool, 3)
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{}
@@ -196,11 +310,9 @@ func GetSpiderWarnData() {
 				publishtime := int64(0)
 				data, ok := tmp["data"].(map[string]interface{})
 				if ok {
-					if ptime := data["publishtime"]; ptime != nil {
-						publishtime = qu.Int64All(ptime)
-						if publishtime > 0 && publishtime < invalidDate { //两年前的历史数据不再推送修改
-							return
-						}
+					ok, publishtime = FilterDataByPublishtime(data)
+					if ok {
+						return
 					}
 				}
 				if info == "Detail File Err" { //正文是链接的,进行链接判重
@@ -300,7 +412,25 @@ func GetSpiderWarnData() {
 	}
 }
 
+func FilterDataByPublishtime(data map[string]interface{}) (ok bool, publishtime int64) {
+	if publishtime, b := data["publishtime"].(int64); b {
+		if publishtime > 0 && publishtime <= invalidDate { //两年前的历史数据不再推送修改
+			return true, publishtime
+		}
+	} else if ptime_str, b := data["publishtime"].(string); b {
+		t, _ := time.ParseInLocation(qu.Date_Full_Layout, ptime_str, time.Local)
+		publishtime = t.Unix()
+		if publishtime > 0 && publishtime <= invalidDate { //两年前的历史数据不再推送修改
+			return true, publishtime
+		}
+	}
+	return
+}
+
 func RepeatData(title string, publishtime int64) bool {
+	if publishtime == 0 {
+		return false
+	}
 	q := map[string]interface{}{
 		"title": title,
 		"publishtime": map[string]interface{}{