Procházet zdrojové kódy

新版爬虫任务逻辑迭代2修改

maxiaoshan před 2 roky
rodič
revize
d675b3ab6b
3 změnil soubory, kde provedl 263 přidání a 103 odebrání
  1. 4 4
      src/config.json
  2. 258 99
      src/luatask/newtask.go
  3. 1 0
      src/luatask/task.go

+ 4 - 4
src/config.json

@@ -5,24 +5,24 @@
 		"size": 15
     },
     "editor": {
-		"addr": "192.168.3.207:29099",
+		"addr": "192.168.3.71:29099",
 		"db": "editor",
 		"size": 15
     },
 	"bideditor": {
-		"addr": "192.168.3.207:29099",
+		"addr": "192.168.3.71:29099",
 		"db": "editor",
 		"size": 2,
 		"username": "",
 		"password": ""
 	},
 	"pyspider":{
-		"addr": "192.168.3.207:29099",
+		"addr": "192.168.3.71:29099",
 		"db": "py_spider",
 		"size": 5
 	},
 	"bidding": {
-		"addr": "192.168.3.207:29099",
+		"addr": "192.168.3.71:29099",
 		"db": "qfw",
 		"size": 2,
 		"username": "",

+ 258 - 99
src/luatask/newtask.go

@@ -35,6 +35,8 @@ var DataInfoWarnMap = map[int]string{
 	8: "Detail File Err",
 }
 
+var UpdateLuaconfig [][]map[string]interface{}
+
 type NewSpider struct {
 	//爬虫基本信息
 	Code         string                 `bson:"code"`
@@ -50,7 +52,7 @@ type NewSpider struct {
 	Working      int                    `bson:"working"`
 	AuditTime    int64                  `bson:"l_uploadtime"`
 	ListIsFilter bool                   `bson:"listisfilter"`
-	TaskTags     map[string]interface{} `bson:"tasktags"`
+	CodeTags     map[string]interface{} `bson:"codetags"`
 	//统计信息
 	Detail_DownloadNum        int               `bson:"detail_downloadnum"`
 	Detail_DownloadSuccessNum int               `bson:"detail_downloadsuccessnum"`
@@ -64,6 +66,8 @@ type NewSpider struct {
 	Py_TaskId   string `bson:"py_taskid"`
 	Py_NodeName string `bson:"py_nodename"`
 	Py_IsValid  bool   `bson:"py_isvalid"`
+	//站点信息
+	Channel_Status int `bson:"channel_status"` //栏目响应状态
 	//补充信息
 	Comeintime int64 `bson:"comeintime"`
 	//异常汇总
@@ -84,10 +88,12 @@ func NewStartTask() {
 	InitInfo() //初始化时间
 	logger.Info(StartTime, EndTime, Publishtime)
 	getCodeBaseInfo()      //获取爬虫基本信息
+	getCodeStatus()        //获取爬虫响应状态信息
 	getPythonSummaryInfo() //获取python汇总信息
 	getLuaSummaryInfo()    //获取lua汇总信息
 	getSpiderWarnInfo()    //获取异常数据
 	saveCodeInfo()         //汇总异常信息,产出任务
+	updateLuaconfig()      //更新爬虫信息
 	closeTask()
 }
 
@@ -130,7 +136,7 @@ func getCodeBaseInfo() {
 		"modifytime":   1,
 		"l_uploadtime": 1,
 		"listisfilter": 1,
-		"tasktags":     1,
+		"codetags":     1,
 	}
 	it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter()
 	n := 0
@@ -171,6 +177,44 @@ func getCodeBaseInfo() {
 	logger.Info("爬虫基本信息准备完成...", len(NewCodeInfoMap))
 }
 
+func getCodeStatus() {
+	defer qu.Catch()
+	sess := util.MgoEB.GetMgoConn()
+	defer util.MgoEB.DestoryMongoConn(sess)
+	lock := &sync.Mutex{}
+	wg := &sync.WaitGroup{}
+	ch := make(chan bool, 5)
+	fields := map[string]interface{}{
+		"spidercode":  1,
+		"status_code": 1,
+	}
+	it := sess.DB(util.MgoPy.DbName).C("site_monitor").Find(nil).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		wg.Add(1)
+		ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			code := qu.ObjToString(tmp["spidercode"])
+			status := qu.IntAll(tmp["status_code"])
+			lock.Lock()
+			if sp := NewCodeInfoMap[code]; sp != nil {
+				sp.Channel_Status = status
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			logger.Info(n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	logger.Info("栏目响应状态信息准备完成...", len(NewCodeInfoMap))
+}
+
 func getPythonSummaryInfo() {
 	defer qu.Catch()
 	sess := util.MgoPy.GetMgoConn()
@@ -231,8 +275,8 @@ func getPythonSummaryInfo() {
 
 func getLuaSummaryInfo() {
 	getSpiderHeart()               //获取心跳信息
-	getSpiderHighListDownloadNum() //获取下载量信息
-	getSpiderListDownloadNum()     //获取下载量信息
+	getSpiderHighListDownloadNum() //获取分开采集模式爬虫下载量信息
+	getSpiderListDownloadNum()     //获取顺序采集模式爬虫下载量信息
 	getSpiderDownloadRateData()    //获取下载详情
 }
 
@@ -462,22 +506,22 @@ func getSpiderListDownloadNum() {
 				if tmpState == 1 { //该href已记录下载成功,后续不做任务记录
 					return
 				} else if tmpState == 0 { //未曾记录该href
-					if sp := NewCodeInfoMap[code]; sp != nil {
-						if state == 1 {
-							sp.Detail_DownloadSuccessNum++
-						} else {
-							state = -1
-							sp.Detail_DownloadFailNum++
-						}
-						sp.Detail_DownloadNum++
-						repeatHrefMap[href] = state
-					}
-				} else if tmpState == -1 && state == 1 { //已记录状态是下载失败,当前下载成功,记录该href最终为下载成功
-					if sp := NewCodeInfoMap[code]; sp != nil {
+					//if sp := NewCodeInfoMap[code]; sp != nil {
+					if state == 1 {
 						sp.Detail_DownloadSuccessNum++
-						sp.Detail_DownloadFailNum--
-						repeatHrefMap[href] = state
+					} else {
+						state = -1
+						sp.Detail_DownloadFailNum++
 					}
+					sp.Detail_DownloadNum++
+					repeatHrefMap[href] = state
+					//}
+				} else if tmpState == -1 && state == 1 { //已记录状态是下载失败,当前下载成功,记录该href最终为下载成功
+					//if sp := NewCodeInfoMap[code]; sp != nil {
+					sp.Detail_DownloadSuccessNum++
+					sp.Detail_DownloadFailNum--
+					repeatHrefMap[href] = state
+					//}
 				}
 			}
 		}(tmp)
@@ -606,10 +650,6 @@ func createTask(sp *NewSpider, taskArr *[][]map[string]interface{}, lock *sync.M
 	if sp.ErrType == -1 { //无异常
 		return
 	}
-	state_new := 0
-	if sp.ErrType == 2 { //数据异常错误类型,任务状态1
-		state_new = 1
-	}
 	//查询历史任务
 	query := map[string]interface{}{
 		"s_code": sp.Code,
@@ -661,6 +701,13 @@ func createTask(sp *NewSpider, taskArr *[][]map[string]interface{}, lock *sync.M
 		*taskArr = append(*taskArr, update)
 		lock.Unlock()
 	} else { //无历史任务
+		state_new := 0
+		if sp.ErrType == 1 && sp.Channel_Status != 200 { //列表页异常任务,栏目响应状态异常者,直接建待处理任务
+			state_new = 1
+		}
+		if sp.ErrType == 2 { //数据异常错误类型,任务状态1
+			state_new = 1
+		}
 		saveMap := map[string]interface{}{
 			"s_modify":     sp.ModifyUser,
 			"s_modifyid":   sp.ModifyId,
@@ -693,7 +740,6 @@ func getAllErr(sp *NewSpider) {
 	downloadFailedErr(sp) //下载异常
 	dataInfoWarn(sp)      //数据异常警告
 }
-
 func listErr(sp *NewSpider) {
 	defer qu.Catch()
 	if sp.Platform == "python" && !sp.Py_IsValid {
@@ -701,9 +747,9 @@ func listErr(sp *NewSpider) {
 	}
 	if !sp.List_IsGetData || sp.List_RunTimes == 0 {
 		errFlag := false
-		if sp.TaskTags != nil {
-			tagTime, _ := sp.TaskTags[NEWTASK_LISTERR].(int64) //用struct接收,会转为floa64
-			if tagTime == 0 {                                  //无列表标记
+		if sp.CodeTags != nil {
+			tagTime, _ := sp.CodeTags[NEWTASK_LISTERR].(int64) //用struct接收,会转为floa64
+			if tagTime == 0 {                                  //无列表异常标记
 				errFlag = true
 			} else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效
 				errFlag = true
@@ -762,12 +808,12 @@ func downloadRateErr(sp *NewSpider) {
 	}
 	if sp.List_AllInTimes > 0 {
 		errFlag := false
-		if sp.Model == 1 { //分开采集,直接记录异常
+		if sp.Model == 1 && sp.AuditTime > 24 { //分开采集,且爬虫审核时间超过24小时,记录异常
 			errFlag = true
-		} else { //顺序采集
-			if sp.TaskTags != nil {
-				tagTime, _ := sp.TaskTags[NEWTASK_RATEERR].(int64)
-				if tagTime == 0 { //无列表标记
+		} else if sp.Event != 7410 { //顺序采集(7410节点不建采集频率异常任务)
+			if sp.CodeTags != nil {
+				tagTime, _ := sp.CodeTags[NEWTASK_RATEERR].(int64)
+				if tagTime == 0 { //无频率异常标记
 					errFlag = true
 				} else if tagTime > 0 && tagTime <= util.GetTime(-7) { //标记失效
 					errFlag = true
@@ -793,85 +839,100 @@ func downloadFailedErr(sp *NewSpider) {
 	if sp.Platform == "python" && !sp.Py_IsValid {
 		return
 	}
+	flagTime := util.GetTime(-7)
 	if sp.Detail_DownloadFailNum > 0 {
-		tagTime := int64(-1)
-		if sp.TaskTags != nil {
-			tagTime, _ = sp.TaskTags[NEWTASK_DOWNLOADERR].(int64)
-		} else { //无标记,记录列表页异常
-			tagTime = 0
+		tagTime := int64(0)
+		if sp.CodeTags != nil {
+			tagTime, _ = sp.CodeTags[NEWTASK_DOWNLOADERR].(int64) //历史标记
+			if tagTime > flagTime {                               //标记未超期
+				if sp.Detail_DownloadFailNum == sp.Detail_DownloadNum { //全部下载失败,删除标记
+					delete(sp.CodeTags, NEWTASK_DOWNLOADERR)
+					UpdateLuaconfig = append(UpdateLuaconfig, []map[string]interface{}{
+						{"code": sp.Code},
+						{"$set": map[string]interface{}{
+							"codetags": sp.CodeTags,
+						}},
+					})
+				} else {
+					return
+				}
+			}
 		}
-		if tagTime > -1 {
-			if sp.Model == 1 { //分开采集(python爬虫默认分开采集模式)
-				errFlag := false
-				if sp.Detail_DownloadNum < 100 { //下载总量小于100
-					if sp.Detail_DownloadFailNum >= 3 { //失败个数超过3个
-						errFlag = true //异常
-					} else if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.2 { //失败占比超过20%
-						errFlag = true //异常
-					}
-				} else if sp.Detail_DownloadFailNum >= 3 { //下载总量大于100,失败个数超过3个
-					if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.03 { //失败占比超过3%
+		if sp.Model == 1 { //分开采集
+			errFlag := false
+			if sp.Detail_DownloadNum < 100 { //下载总量小于100
+				if sp.Detail_DownloadFailNum >= 3 { //失败个数超过3个
+					errFlag = true //异常
+				} else if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.2 { //失败占比超过20%
+					errFlag = true //异常
+				}
+			} else if sp.Detail_DownloadFailNum >= 3 { //下载总量大于100,失败个数超过3个
+				if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.03 { //失败占比超过3%
+					errFlag = true //异常
+				} else {
+					if sp.Detail_DownloadFailNum >= 30 { //失败个数超过30个
 						errFlag = true //异常
 					} else {
-						if sp.Detail_DownloadFailNum >= 30 { //失败个数超过30个
-							errFlag = true //异常
+						if qu.FormatDateByInt64(&tagTime, qu.Date_Short_Layout) == qu.FormatDateByInt64(&flagTime, qu.Date_Short_Layout) {
+							errFlag = true
 						} else {
-							tagFlag := tagTime == util.GetTime(-7) //上次标记时间是否是7天前当天
-							if tagTime == 0 || !tagFlag {          //系统打标记
-								//系统打标记
-							} else if tagFlag {
-								errFlag = true //异常
-							}
+							//系统打标记
+							UpdateLuaconfig = append(UpdateLuaconfig, []map[string]interface{}{
+								{"code": sp.Code},
+								{"$set": map[string]interface{}{
+									"codetags." + NEWTASK_DOWNLOADERR: time.Now().Unix(),
+								}},
+							})
 						}
 					}
 				}
-				if errFlag {
-					q := map[string]interface{}{
-						"$or": []interface{}{
-							map[string]interface{}{ //state=-1下载失败
-								"spidercode": sp.Code,
-								"state":      -1,
-								"comeintime": map[string]interface{}{
-									"$gte": StartTime,
-									"$lt":  EndTime,
-								},
+			}
+			if errFlag {
+				q := map[string]interface{}{
+					"$or": []interface{}{
+						map[string]interface{}{ //state=-1下载失败
+							"spidercode": sp.Code,
+							"state":      -1,
+							"comeintime": map[string]interface{}{
+								"$gte": StartTime,
+								"$lt":  EndTime,
+							},
+						},
+						map[string]interface{}{ //state=0,times存在,前一天未下载成功的
+							"spidercode": sp.Code,
+							"state":      0,
+							"times": map[string]interface{}{
+								"$exists": true,
 							},
-							map[string]interface{}{ //state=0,times存在,前一天未下载成功的
-								"spidercode": sp.Code,
-								"state":      0,
-								"times": map[string]interface{}{
-									"$exists": true,
-								},
-								"comeintime": map[string]interface{}{
-									"$gte": StartTime,
-									"$lt":  EndTime,
-								},
+							"comeintime": map[string]interface{}{
+								"$gte": StartTime,
+								"$lt":  EndTime,
 							},
 						},
-					}
-					sp.getErrHrefs("spider_highlistdata", NEWTASK_DOWNLOADERR, q)
-					sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
-					if sp.ErrType < 0 {
-						sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR)
-					}
-				}
-			} else { //顺序采集
-				//查询有无第一次记录(count=0),且下载失败的数据(count>0的数据表示该数据已经在采集当天统计过,不再二次统计)
-				q := map[string]interface{}{
-					"spidercode": sp.Code,
-					"count":      0,
-					"state":      -1,
-					"comeintime": map[string]interface{}{
-						"$gte": StartTime,
-						"$lt":  EndTime,
 					},
 				}
-				count := sp.getErrHrefs("spider_listdata", NEWTASK_DOWNLOADERR, q)
-				if count > 0 {
-					sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
-					if sp.ErrType < 0 {
-						sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR)
-					}
+				sp.getErrHrefs("spider_highlistdata", NEWTASK_DOWNLOADERR, q)
+				sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
+				if sp.ErrType < 0 {
+					sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR)
+				}
+			}
+		} else { //顺序采集
+			//查询有无第一次记录(count=0),且下载失败的数据(count>0的数据表示该数据已经在采集当天统计过,不再二次统计)
+			q := map[string]interface{}{
+				"spidercode": sp.Code,
+				"count":      0,
+				"state":      -1,
+				"comeintime": map[string]interface{}{
+					"$gte": StartTime,
+					"$lt":  EndTime,
+				},
+			}
+			count := sp.getErrHrefs("spider_listdata", NEWTASK_DOWNLOADERR, q)
+			if count > 0 {
+				sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
+				if sp.ErrType < 0 {
+					sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR)
 				}
 			}
 		}
@@ -881,8 +942,8 @@ func dataInfoWarn(sp *NewSpider) {
 	defer qu.Catch()
 	if len(sp.WarnInfoMap) > 0 {
 		tagTime := int64(-1)
-		if sp.TaskTags != nil {
-			tagTime, _ = sp.TaskTags[NEWTASK_DATAINFOWARN].(int64)
+		if sp.CodeTags != nil {
+			tagTime, _ = sp.CodeTags[NEWTASK_DATAINFOWARN].(int64)
 		} else { //无标记,记录列表页异常
 			tagTime = 0
 		}
@@ -946,6 +1007,14 @@ func (sp *NewSpider) getErrHrefs(coll, errType string, query map[string]interfac
 	return
 }
 
+//更新爬虫
+func updateLuaconfig() {
+	if len(UpdateLuaconfig) > 0 {
+		util.MgoEB.UpdateBulk("luaconfig", UpdateLuaconfig...)
+		UpdateLuaconfig = [][]map[string]interface{}{}
+	}
+}
+
 //关闭任务
 func closeTask() {
 	defer qu.Catch()
@@ -971,3 +1040,93 @@ func closeTask() {
 	2、下载异常由于原网站详情页无信息造成的,如何提高任务准确率?
 	3、7410变链接造成的采集频率异常如何解决?
 */
+
+//func downloadFailedErr(sp *NewSpider) {
+//	defer qu.Catch()
+//	if sp.Platform == "python" && !sp.Py_IsValid {
+//		return
+//	}
+//	if sp.Detail_DownloadFailNum > 0 {
+//		tagTime := int64(-1)
+//		if sp.CodeTags != nil {
+//			tagTime, _ = sp.CodeTags[NEWTASK_DOWNLOADERR].(int64)
+//		} else { //无标记,记录列表页异常
+//			tagTime = 0
+//		}
+//		if tagTime > -1 {
+//			if sp.Model == 1 { //分开采集(python爬虫默认分开采集模式)
+//				errFlag := false
+//				if sp.Detail_DownloadNum < 100 { //下载总量小于100
+//					if sp.Detail_DownloadFailNum >= 3 { //失败个数超过3个
+//						errFlag = true //异常
+//					} else if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.2 { //失败占比超过20%
+//						errFlag = true //异常
+//					}
+//				} else if sp.Detail_DownloadFailNum >= 3 { //下载总量大于100,失败个数超过3个
+//					if float64(sp.Detail_DownloadFailNum)/float64(sp.Detail_DownloadNum) >= 0.03 { //失败占比超过3%
+//						errFlag = true //异常
+//					} else {
+//						if sp.Detail_DownloadFailNum >= 30 { //失败个数超过30个
+//							errFlag = true //异常
+//						} else {
+//							tagFlag := tagTime == util.GetTime(-7) //上次标记时间是否是7天前当天
+//							if tagTime == 0 || !tagFlag {          //系统打标记
+//								//系统打标记
+//							} else if tagFlag {
+//								errFlag = true //异常
+//							}
+//						}
+//					}
+//				}
+//				if errFlag {
+//					q := map[string]interface{}{
+//						"$or": []interface{}{
+//							map[string]interface{}{ //state=-1下载失败
+//								"spidercode": sp.Code,
+//								"state":      -1,
+//								"comeintime": map[string]interface{}{
+//									"$gte": StartTime,
+//									"$lt":  EndTime,
+//								},
+//							},
+//							map[string]interface{}{ //state=0,times存在,前一天未下载成功的
+//								"spidercode": sp.Code,
+//								"state":      0,
+//								"times": map[string]interface{}{
+//									"$exists": true,
+//								},
+//								"comeintime": map[string]interface{}{
+//									"$gte": StartTime,
+//									"$lt":  EndTime,
+//								},
+//							},
+//						},
+//					}
+//					sp.getErrHrefs("spider_highlistdata", NEWTASK_DOWNLOADERR, q)
+//					sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
+//					if sp.ErrType < 0 {
+//						sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR)
+//					}
+//				}
+//			} else { //顺序采集
+//				//查询有无第一次记录(count=0),且下载失败的数据(count>0的数据表示该数据已经在采集当天统计过,不再二次统计)
+//				q := map[string]interface{}{
+//					"spidercode": sp.Code,
+//					"count":      0,
+//					"state":      -1,
+//					"comeintime": map[string]interface{}{
+//						"$gte": StartTime,
+//						"$lt":  EndTime,
+//					},
+//				}
+//				count := sp.getErrHrefs("spider_listdata", NEWTASK_DOWNLOADERR, q)
+//				if count > 0 {
+//					sp.ErrTypeMap[qu.IntAll(NEWTASK_DOWNLOADERR)] = true
+//					if sp.ErrType < 0 {
+//						sp.ErrType = qu.IntAll(NEWTASK_DOWNLOADERR)
+//					}
+//				}
+//			}
+//		}
+//	}
+//}

+ 1 - 0
src/luatask/task.go

@@ -210,6 +210,7 @@ func InitInfo() {
 	SameDayHref = map[string]string{}
 	DataBakAllHref = map[string]string{}
 	UserTaskNum = map[string]map[string]int{}
+	UpdateLuaconfig = [][]map[string]interface{}{}
 	//StartTime, EndTime = util.GetWorkDayTimeUnix()
 	StartTime = util.GetTime(-1)
 	EndTime = util.GetTime(0)