Sfoglia il codice sorgente

任务逻辑修改;爬虫转移节点

maxiaoshan 2 anni fa
parent
commit
3f4767f882

+ 27 - 13
src/config.json

@@ -48,67 +48,80 @@
 		"7000": {
 			"server": "comm",
 			"model": 0,
-			"work": 0
+			"work": 0,
+			"type": 0
 		},
 		"7100": {
 			"server": "bid",
 			"model": 1,
-			"work": 0
+			"work": 0,
+			"type": 1
 		},
 		"7110": {
 			"server": "bid",
 			"model": 1,
-			"work": 0
+			"work": 0,
+			"type": 1
 		},
 		"7200": {
 			"server": "comm",
 			"model": 1,
-			"work": 1
+			"work": 1,
+			"type": 2
 		},
 		"7210": {
 			"server": "comm",
 			"model": 1,
-			"work": 1
+			"work": 1,
+			"type": 2
 		},
 		"7300": {
 			"server": "comm",
 			"model": 1,
-			"work": 1
+			"work": 1,
+			"type": 2
 		},
 		"7310": {
 			"server": "comm",
 			"model": 1,
-			"work": 1
+			"work": 1,
+			"type": 2
 		},
 		"7400": {
 			"server": "bid",
 			"model": 1,
-			"work": 0
+			"work": 0,
+			"type": 1
 		},
 		"7410": {
 			"server": "bid",
 			"model": 0,
-			"work": 0
+			"work": 0,
+			"type": 1
 		},
 		"7500": {
 			"server": "comm",
 			"model": 0,
-			"work": 1
+			"work": 1,
+			"type": 3
 		},
 		"7510": {
 			"server": "comm",
 			"model": 0,
-			"work": 1
+			"work": 1,
+			"type": 3
 		},
 		"7520": {
 			"server": "comm",
 			"model": 0,
-			"work": 1
+			"work": 1,
+			"type": 4
 		},
 		"7700": {
 			"server": "comm",
 			"model": 0,
-			"work": 1
+			"work": 1,
+			"type": 3
 		}
 	},
 	"resetdatastatecron": "0 0 2 ? * MON-FRI",
@@ -120,6 +133,7 @@
 	"filewarncron": "0 55 8 ? * *",
 	"movelistdatacron": "0 0 0 ? * *",
 	"spiderweeklyreportcron": "0 50 6 ? * SUN",
+	"luamovecron": "0 0 8 1 * *",
 	"closenum": 2,
 	"daynum": 6,
 	"mail": {

+ 78 - 0
src/luatask/othertask.go

@@ -0,0 +1,78 @@
+package luatask
+
+import (
+	qu "qfw/util"
+	"time"
+	"util"
+)
+
+//由挂起状态爬虫创建任务
+func CreateTaskByCodePendstate() {
+	defer qu.Catch()
+	today := time.Now()
+	query := map[string]interface{}{
+		"state":     5,
+		"platform":  "golua平台",
+		"pendstate": 1,
+		"pendtime": map[string]interface{}{
+			"$lt":  util.GetTime(-14), //爬虫挂起超过14天的,创建任务
+			"$gte": 1661961600,
+		},
+	}
+	fields := map[string]interface{}{
+		"code":         1,
+		"channel":      1,
+		"site":         1,
+		"modifyuser":   1,
+		"modifyuserid": 1,
+		"event":        1,
+		"pendtime":     1,
+	}
+	list, _ := util.MgoEB.Find("luaconfig", query, nil, fields, false, -1, -1)
+	for _, tmp := range *list {
+		code := qu.ObjToString(tmp["code"])
+		pendtime := qu.Int64All(tmp["pendtime"])
+		pendtimeStr := qu.FormatDateByInt64(&pendtime, qu.Date_Short_Layout)
+		tquery := map[string]interface{}{
+			"s_code": code,
+			"i_state": map[string]interface{}{
+				"$nin": []int{4, 6},
+			},
+		}
+		task, _ := util.MgoEB.FindOneByField("task", tquery, map[string]interface{}{"s_descript": 1, "i_state": 1})
+		q := map[string]interface{}{}
+		set := map[string]interface{}{}
+		if len(*task) > 0 { //有历史任务
+			if state := qu.IntAll((*task)["i_state"]); state >= 3 { //待审核、未通过任务
+				continue
+			}
+			//区分处理中任务是因为:处理中的任务由审核人员核实而来有l_checktime字段(周报统计),做任务的更新操作;待确认、待处理的任务由程序分发,暂时不加l_checktime属性
+			descript := qu.ObjToString((*task)["s_descript"])
+			q["_id"] = (*task)["_id"]
+			set["i_state"] = 2
+			set["l_updatetime"] = time.Now().Unix()
+			set["i_pendstate"] = 1
+			set["s_descript"] = descript + qu.FormatDate(&today, qu.Date_Short_Layout) + "追加描述:------------------------------\n该爬虫已连续挂起超7天(" + pendtimeStr + ")\n"
+		} else { //无历史任务
+			set["s_descript"] = "该爬虫已连续挂起超7天(" + pendtimeStr + ")\n"
+			set["s_platform"] = "golua平台"
+			set["s_channel"] = tmp["channel"]
+			set["i_event"] = tmp["event"]
+			set["s_type"] = "7"
+			set["i_times"] = 1
+			set["i_num"] = 0
+			set["l_comeintime"] = time.Now().Unix()
+			set["s_urgency"] = "1"
+			set["l_complete"] = util.CompleteTime("1")
+			set["s_modify"] = tmp["modifyuser"]
+			set["s_modifyid"] = tmp["modifyuserid"]
+			set["s_site"] = tmp["site"]
+			set["i_state"] = 2
+			set["i_pendstate"] = 1
+			set["s_code"] = code
+			set["s_source"] = "程序"
+			set["i_frequencyerrtimes"] = 0
+		}
+		util.MgoEB.Update("task", q, map[string]interface{}{"$set": set}, true, false)
+	}
+}

+ 217 - 15
src/luatask/sitecount.go

@@ -16,19 +16,21 @@ import (
 )
 
 type SiteInfo struct {
-	Site              string `json:""`                  //站点
-	Num               int    `json:"averagenum"`        //每日网站发布平均量
-	Modifyuser        string `json:"modifyuser"`        //维护人
-	State             string `json:"state"`             //网站状态
-	Domain            string `json:"domain"`            //域名
-	Stype             string `json:"stype"`             //网站类型
-	Platform          string `json:"platform"`          //所属平台
-	Coverage          string `json:"coverage"`          //覆盖率
-	ListAllNum        int    `json:"listallnum"`        //href去重,当天采集数据量
-	ListSuccessNum    int    `json:"listsuccessnum"`    //href去重,当天采集成功数据量
-	PTimeSuccessNum   int    `json:"ptimesuccessnum"`   //href去重,当天发布采集成功数据量
-	PTimeSuccessDbNum int    `json:"ptimesuccessdbnum"` //href去重,data_bak当天发布采集成功数据量
-	Comeintime        int64  `json:"comeintime"`        //href去重,当天发布采集成功数据量
+	Site                  string `json:"site"`                  //站点
+	Num                   int    `json:"averagenum"`            //每日网站发布平均量
+	Modifyuser            string `json:"modifyuser"`            //维护人
+	State                 string `json:"state"`                 //网站状态
+	Domain                string `json:"domain"`                //域名
+	Stype                 string `json:"stype"`                 //网站类型
+	Platform              string `json:"platform"`              //所属平台
+	Coverage              string `json:"coverage"`              //覆盖率
+	ListAllNum            int    `json:"listallnum"`            //href去重,当天采集数据量
+	ListSuccessNum        int    `json:"listsuccessnum"`        //href去重,当天采集成功数据量
+	PTimeSuccessNum       int    `json:"ptimesuccessnum"`       //href去重,当天发布采集成功数据量
+	PTimeSuccessDbNum     int    `json:"ptimesuccessdbnum"`     //href去重,data_bak当天发布采集成功数据量
+	ThreeDaysAgoNum       int    `json:"threedaysagonum"`       //三天前当天的数据量再次统计(有些站点发布延迟导致当天数据量不准确,再次统计)
+	BeforeThreeDaysAgoNum int    `json:"beforethreedaysagonum"` //三天前当天的数据量历史统计
+	Comeintime            int64  `json:"comeintime"`            //href去重,当天发布采集成功数据量
 }
 
 var SiteInfoModel = `{
@@ -106,8 +108,10 @@ func SendInfoToWxWork_SiteDataCount() {
 	GetAllSpidercodeNum(allSpiderMap)
 	//6、汇总excel
 	//GetSiteInfoExcel(siteInfoMap, siteInfoMap_Back, allSpiderMap)
-	GetSiteInfoExcel(allSpiderMap)
+	day := GetThreeDaysAgoNum(allSpiderMap)
+	GetSiteInfoExcel(allSpiderMap, day)
 }
+
 func GetAllSpidercodeNum(siteInfoMap map[string]*SiteInfo) {
 	defer qu.Catch()
 	logger.Info("统计采集量luacodeinfo开始...")
@@ -166,7 +170,199 @@ func GetAllSpidercodeNum(siteInfoMap map[string]*SiteInfo) {
 	logger.Debug("统计采集量luacodeinfo完成...")
 }
 
-func GetSiteInfoExcel(allSpiderInfo map[string]*SiteInfo) {
+func GetThreeDaysAgoNum(siteInfoMap map[string]*SiteInfo) (strStime string) {
+	defer qu.Catch()
+	//1、获取三个工作日之前的日期
+	baseDay := 3
+	for i := 1; i <= baseDay; i++ { //除去三天内周六周日
+		beforDay := time.Now().AddDate(0, 0, -i)
+		if weekDay := beforDay.Weekday().String(); weekDay == "Saturday" || weekDay == "Sunday" {
+			baseDay++
+		}
+	}
+	logger.Info("baseday:", baseDay)
+	stime := util.GetTime(-baseDay)                               //起始时间戳(三个工作日前)
+	strStime = qu.FormatDateByInt64(&stime, qu.Date_Short_Layout) //起始日期
+	logger.Info("查询天:", stime, strStime)
+	//3、统计数据量
+	GetSpiderHighListDataNum(stime, strStime, siteInfoMap) //spider_highlistdata
+	GetSpiderListDataNum(stime, strStime, siteInfoMap)     //spider_listdata
+	GetPythonDataNum(stime, strStime, siteInfoMap)
+	GetNumByLastTime(stime, baseDay, siteInfoMap)
+	return
+}
+
+func GetSpiderHighListDataNum(stime int64, strStime string, siteInfoMap map[string]*SiteInfo) {
+	defer qu.Catch()
+	sess := util.MgoS.GetMgoConn()
+	defer util.MgoS.DestoryMongoConn(sess)
+	HrefRepeatMap := map[string]string{}
+	lock := &sync.Mutex{}
+	wg := &sync.WaitGroup{}
+	ch := make(chan bool, 5)
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": stime,
+		},
+		"publishtime": map[string]interface{}{
+			"$regex": strStime,
+		},
+	}
+	fieles := map[string]interface{}{
+		"href": 1,
+		"site": 1,
+	}
+	it := sess.DB(util.MgoS.DbName).C("spider_highlistdata").Find(&query).Select(&fieles).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		wg.Add(1)
+		ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			site := qu.ObjToString(tmp["site"])
+			lock.Lock()
+			if sInfo := siteInfoMap[site]; sInfo != nil { //要统计的重点站点
+				href := qu.ObjToString(tmp["href"])
+				if tmpSite := HrefRepeatMap[href]; tmpSite != site { //同站点去重
+					sInfo.ThreeDaysAgoNum++
+					HrefRepeatMap[href] = site
+				}
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			logger.Debug(n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	HrefRepeatMap = map[string]string{}
+	logger.Debug("三天前发布spider_highlistdata统计完毕...")
+}
+
+func GetSpiderListDataNum(stime int64, strStime string, siteInfoMap map[string]*SiteInfo) {
+	defer qu.Catch()
+	sess := util.MgoS.GetMgoConn()
+	defer util.MgoS.DestoryMongoConn(sess)
+	lock := &sync.Mutex{}
+	wg := &sync.WaitGroup{}
+	ch := make(chan bool, 5)
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": stime,
+		},
+		"publishtime": map[string]interface{}{
+			"$regex": strStime,
+		},
+	}
+	fieles := map[string]interface{}{
+		"site":  1,
+		"event": 1,
+	}
+	it := sess.DB(util.MgoS.DbName).C("spider_listdata").Find(&query).Select(&fieles).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		wg.Add(1)
+		ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			if qu.IntAll(tmp["event"]) == 7000 { //排除7000节点
+				return
+			}
+			site := qu.ObjToString(tmp["site"])
+			lock.Lock()
+			if sInfo := siteInfoMap[site]; sInfo != nil { //要统计的重点站点
+				sInfo.ThreeDaysAgoNum++
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			logger.Debug(n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	logger.Debug("三天前发布spider_listdata统计完毕...")
+}
+
+func GetPythonDataNum(stime int64, strStime string, siteInfoMap map[string]*SiteInfo) {
+	defer qu.Catch()
+	sess := util.MgoPy.GetMgoConn()
+	defer util.MgoPy.DestoryMongoConn(sess)
+	lock := &sync.Mutex{}
+	wg := &sync.WaitGroup{}
+	ch := make(chan bool, 5)
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": stime,
+		},
+		"publishtime": map[string]interface{}{
+			"$regex": strStime,
+		},
+	}
+	fieles := map[string]interface{}{
+		"site": 1,
+	}
+	qu.Debug(query)
+	it := sess.DB(util.MgoPy.DbName).C("data_bak").Find(&query).Select(&fieles).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		wg.Add(1)
+		ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			site := qu.ObjToString(tmp["site"]) + "(python)"
+			lock.Lock()
+			if sInfo := siteInfoMap[site]; sInfo != nil { //要统计的重点站点
+				sInfo.ThreeDaysAgoNum++
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			logger.Debug(n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	logger.Debug("三天前发布python统计完毕...")
+}
+
+func GetNumByLastTime(stime int64, baseDay int, siteInfoMap map[string]*SiteInfo) {
+	defer qu.Catch()
+	stimeWeekDay := time.Now().AddDate(0, 0, -baseDay).Weekday().String()
+	start := stime + 86400
+	end := stime + 86400*2
+	if stimeWeekDay == "Friday" { //每周五的数据是每周一统计
+		start = stime + 86400*3
+		end = stime + 86400*4
+	}
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": start,
+			"$lt":  end,
+		},
+	}
+	logger.Info("历史站点统计", query)
+	list, _ := util.MgoEB.Find("site_datacount", query, nil, map[string]interface{}{"site": 1, "ptimesuccessnum": 1}, false, -1, -1)
+	for _, l := range *list {
+		site := qu.ObjToString(l["site"])
+		pNum := qu.IntAll(l["ptimesuccessnum"])
+		if sInfo := siteInfoMap[site]; sInfo != nil {
+			sInfo.BeforeThreeDaysAgoNum = pNum
+		}
+	}
+}
+
+func GetSiteInfoExcel(allSpiderInfo map[string]*SiteInfo, day string) {
 	defer qu.Catch()
 	file, err := xlsx.OpenFile("res/sitecount.xlsx")
 	if err != nil {
@@ -181,6 +377,10 @@ func GetSiteInfoExcel(allSpiderInfo map[string]*SiteInfo) {
 		style.ApplyFont = true
 		font := *xlsx.NewFont(10, "Verdana")
 		style.Font = font
+		title1 := day + "新统计(publishtime)"
+		title2 := day + "历史统计(publishtime)"
+		sheet.Rows[0].Cells[6].SetValue(title1)
+		sheet.Rows[0].Cells[7].SetValue(title2)
 		row := sheet.AddRow()
 		row.AddCell().SetValue(site)
 		row.AddCell().SetValue(info.Num)
@@ -188,6 +388,8 @@ func GetSiteInfoExcel(allSpiderInfo map[string]*SiteInfo) {
 		row.AddCell().SetValue(info.ListSuccessNum)
 		row.AddCell().SetValue(info.PTimeSuccessNum)
 		row.AddCell().SetValue(info.PTimeSuccessDbNum)
+		row.AddCell().SetValue(info.ThreeDaysAgoNum)
+		row.AddCell().SetValue(info.BeforeThreeDaysAgoNum)
 		coverage := float64(info.PTimeSuccessNum) / float64(info.Num)
 		fill := &xlsx.Fill{
 			PatternType: "solid",

+ 43 - 29
src/luatask/task.go

@@ -75,22 +75,23 @@ var LuaErrTypeInfoMap = map[string]ErrTypeInfo{
 
 //spider
 type Spider struct {
-	Site              string `json:"site"`              //站点
-	Platform          string `json:"platform"`          //平台
-	Code              string `json:"spidercode"`        //爬虫
-	Channel           string `json:"channel"`           //栏目
-	AuditTime         int64  `json:"audittime"`         //最新审核时间
-	ModifyUser        string `json:"modifyuser"`        //维护人
-	ModifyId          string `json:"modifyid"`          //维护人id
-	Event             int    `json:"event"`             //节点
-	State             int    `json:"state"`             //状态
-	PendState         int    `json:"pendstate"`         //挂起状态
-	Weight            int    `json:"weight"`            //爬虫权重
-	FrequencyErrTimes int    `json:"frequencyerrtimes"` //爬虫采集频率异常次数
-	MaxPage           int    `json:"maxpage"`           //采集最大页
-	Model             int    `json:"model"`             //采集模式(新\老) 0:老模式;1:新模式
-	Working           int    `json:"working"`           //采集模式(高低\性能)0:高性能模式;1:队列模式
-	ListIsFilter      bool   `json:"listisfilter"`      //lua列表页采集是否包含过滤
+	Site              string                 `json:"site"`              //站点
+	Platform          string                 `json:"platform"`          //平台
+	Code              string                 `json:"spidercode"`        //爬虫
+	Channel           string                 `json:"channel"`           //栏目
+	AuditTime         int64                  `json:"audittime"`         //最新审核时间
+	ModifyUser        string                 `json:"modifyuser"`        //维护人
+	ModifyId          string                 `json:"modifyid"`          //维护人id
+	Event             int                    `json:"event"`             //节点
+	State             int                    `json:"state"`             //状态
+	PendState         int                    `json:"pendstate"`         //挂起状态
+	Weight            int                    `json:"weight"`            //爬虫权重
+	FrequencyErrTimes int                    `json:"frequencyerrtimes"` //爬虫采集频率异常次数
+	MaxPage           int                    `json:"maxpage"`           //采集最大页
+	Model             int                    `json:"model"`             //采集模式(新\老) 0:老模式;1:新模式
+	Working           int                    `json:"working"`           //采集模式(高低\性能)0:高性能模式;1:队列模式
+	ListIsFilter      bool                   `json:"listisfilter"`      //lua列表页采集是否包含过滤
+	TaskTags          map[string]interface{} `json:"tasktags"`          //爬虫无效任务类型标记
 	//基于comeintime不去重的下载量
 	DownloadAllNum     int `json:"downloadallnum"`     //总下载量
 	DownloadSuccessNum int `json:"downloadsuccessnum"` //下载成功量
@@ -255,6 +256,7 @@ func GetCodeBaseInfo() {
 		"listisfilter":      1,
 		"frequencyerrtimes": 1,
 		"code":              1,
+		"tasktags":          1,
 	}
 	count := util.MgoEB.Count("luaconfig", query)
 	logger.Debug("共加载线上爬虫个数:", count)
@@ -297,6 +299,7 @@ func GetCodeBaseInfo() {
 			sp.Model = util.CodeEventModel[sp.Event]
 			sp.Working = util.CodeEventWorking[sp.Event]
 			sp.Comeintime = time.Now().Unix()
+			sp.TaskTags, _ = tmp["tasktags"].(map[string]interface{})
 			lock.Lock()
 			CodeInfoMap[sp.Code] = sp
 			lock.Unlock()
@@ -1099,7 +1102,7 @@ func GetSpiderWarnErrData() {
 					infoMap := info.(map[string]interface{})
 					infoText := qu.ObjToString(infoMap["info"]) //错误信息
 					errCount := qu.IntAll(infoMap["count"])     //错误数量
-					if infoText == "Publishtime Is Too Late" {  //发布时间超前
+					/*if infoText == "Publishtime Is Too Late" {  //发布时间超前
 						query["info"] = infoText
 						stype = "publishtime"
 					} else if infoText == "Publishtime Is Less Than Zero" { //发布时间小于0
@@ -1111,6 +1114,10 @@ func GetSpiderWarnErrData() {
 					} else if infoText == "Field Value Not Contains Chinese" { //title、detail不含中文
 						query["info"] = infoText
 						stype = "text"
+					} else*/
+					if infoText == "Publishtime Is Less Than Zero" { //发布时间小于0
+						query["info"] = infoText
+						stype = "publishtime"
 					} else if infoText == "Field Value Contains Random Code" { //title、detail含乱码
 						query["info"] = infoText
 						stype = "text"
@@ -1577,7 +1584,11 @@ func CreateTaskProcess() {
 				for stype, info := range LuaErrTypeInfoMap {
 					if err := spider.Error[stype]; err != nil {
 						taskStateOk := false
-						if stype == "download" {
+						if stype == "download" { //下载异常处理
+							//有下载异常标签且上次标记时间在7天内的不建该异常类型任务
+							if tagTime, ok := spider.TaskTags[fmt.Sprint(TASK_DOWNLOADERR)].(int64); ok && tagTime >= util.GetTime(-7) {
+								continue
+							}
 							if spider.Model == 1 { //新模式(7100、7110、7200、7210、7300、7310、7400)根据异常总量和占比建任务
 								moreThanLimit := false
 								//1、异常条数;2、异常占比
@@ -1628,12 +1639,12 @@ func CreateTaskProcess() {
 					if spider.Event == 7520 { //由于7520节点爬虫循环一轮的时间较长,心跳有可能仍是前一天的
 						limitDayNum = -1
 					}
-					if !spider.ListIsFilter || (spider.FindListHeart < util.GetTime(limitDayNum) && spider.ListIsFilter) { //列表页不含过滤代码或者有过滤无心跳
+					//if !spider.ListIsFilter || (spider.FindListHeart < util.GetTime(limitDayNum) && spider.ListIsFilter) { //列表页不含过滤代码或者有过滤无心跳
+					if spider.FindListHeart < util.GetTime(limitDayNum) { //最近心跳判断为异常
 						task.State = 1 //待处理
 						task.ErrType = TASK_LISTERR
 						task.DescribeMap[TASK_LISTERR] = "列表页异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListNoDataTimes) + "轮无数据\n"
 					}
-
 					// if !spider.ListIsFilter { //列表页不含过滤代码
 					// 	task.State = 1 //待处理
 					// 	task.ErrType = TASK_LISTERR
@@ -1644,16 +1655,19 @@ func CreateTaskProcess() {
 				}
 				//6、采集频率异常	errtype:8
 				if spider.ListOhPercentTimes > 0 { //采集频率异常
-					//UpdateLuaInfo(spider) //出现采集频率异常,便更新爬虫的frequencyerrtimes、最大页自动加1、重新上架
-					//只有当FrequencyErrTimes>3取采集频率异常,相反优先其他异常类型(采集频率异常且待确认时程序自动处理,人工几乎不介入)
-					if spider.FrequencyErrTimes > 3 { //爬虫采集频率异常次数大于3次,任务为待处理,否则为待确认
-						task.State = 1 //待处理
-						task.ErrType = TASK_RATEERR
-					} else if len(task.DescribeMap) == 0 { //只有采集频率异常且FrequencyErrTimes<=3
-						task.State = 0 //待确认
-						task.ErrType = TASK_RATEERR
+					tagTime, ok := spider.TaskTags[fmt.Sprint(TASK_RATEERR)].(int64)
+					if !ok || tagTime < util.GetTime(-7) { //无标签或者上次标记时间不在7天内的建任务
+						//UpdateLuaInfo(spider) //出现采集频率异常,便更新爬虫的frequencyerrtimes、最大页自动加1、重新上架
+						//只有当FrequencyErrTimes>3取采集频率异常,相反优先其他异常类型(采集频率异常且待确认时程序自动处理,人工几乎不介入)
+						if spider.FrequencyErrTimes > 3 { //爬虫采集频率异常次数大于3次,任务为待处理,否则为待确认
+							task.State = 1 //待处理
+							task.ErrType = TASK_RATEERR
+						} else if len(task.DescribeMap) == 0 { //只有采集频率异常且FrequencyErrTimes<=3
+							task.State = 0 //待确认
+							task.ErrType = TASK_RATEERR
+						}
+						task.DescribeMap[TASK_RATEERR] = "采集频率异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListOhPercentTimes) + "轮数据全采\n"
 					}
-					task.DescribeMap[TASK_RATEERR] = "采集频率异常:\n 列表页共采集" + fmt.Sprint(spider.ListDownloadAllTimes) + "轮,其中有" + fmt.Sprint(spider.ListOhPercentTimes) + "轮数据全采\n"
 				}
 			} else if spider.Platform == "python" { //python异常
 				for stype, info := range PythonErrTypeInfoMap {

+ 2 - 1
src/main.go

@@ -42,6 +42,7 @@ func main() {
 	c.Start()
 	//定时任务
 	c.AddFunc(util.RandomDataPushCron, timetask.GetSpiderWarnData) //数据维护平台-爬虫数据维护数据数据统计
+	c.AddFunc(util.LuamoveCron, timetask.LuaMoveEvent)             //数据维护平台-爬虫数据维护数据数据统计
 	//
 	c.AddFunc(util.FileWarnCron, timetask.GetFileWarn) //异常附件数据警告
 	//
@@ -49,7 +50,7 @@ func main() {
 	c.AddFunc(util.QyworkRemindAuditorCron, timetask.SendInfoToWxWork_ToAuditor)       //企业微信日常警告,审核人员告警信息
 
 	c.AddFunc(util.MoveListDataCron, timetask.MoveListData) //列表页数据迁移
-	//编辑器任务
+	//爬虫任务
 	c.AddFunc(util.ResetDataStateCron, luatask.ResetDataState) //重置数据
 	c.AddFunc(util.StartTaskCron, luatask.StartTask)           //开始任务
 	//爬虫相关周报统计

+ 248 - 0
src/timetask/luamove.go

@@ -0,0 +1,248 @@
+package timetask
+
+import (
+	"encoding/json"
+	"github.com/donnie4w/go-logger/logger"
+	qu "qfw/util"
+	"sort"
+	"sync"
+	"time"
+	"util"
+)
+
+type Spider struct {
+	Code         string `json:"code"`
+	Site         string `json:"site"`
+	Channel      string `json:"channel"`
+	FromEvent    int    `json:"fromevent"`    //现节点
+	ToEvent      int    `json:"toevent"`      //目标节点
+	DataNum      int    `json:"datanum"`      //采集量
+	PtimeDataNum int    `json:"ptimedatanum"` //按发布时间统计的采集量
+	//Average      int    `json:"average"`      //平均值
+	IsMove     bool  `json:"ismove"` //是否转移节点
+	State      int   `json:"state"`
+	Comeintime int64 `json:"comeintime"`
+}
+
+var CodeMap map[string]*Spider
+
+type EventNum struct {
+	Event int //节点
+	Num   int //节点爬虫数量
+}
+
+//节点上的爬虫个数
+var EventMapType1 = map[int]int{} //7100、7110、7400、7410
+var EventMapType2 = map[int]int{} //7200、7210、7300、7310
+var EventMapType3 = map[int]int{} //7500、7510、7700
+
+var EventArrType1 []*EventNum //7100、7110、7400、7410
+var EventArrType2 []*EventNum //7200、7210、7300、7310
+var EventArrType3 []*EventNum //7500、7510、7700
+
+func LuaMoveEvent() {
+	defer qu.Catch()
+	CodeMap = map[string]*Spider{}
+	GetLuaInfo() //获取爬虫信息
+	GetDataNum() //统计爬虫采集量
+	GetMoveLua() //计算哪些爬虫需要转节点
+}
+
+func GetLuaInfo() {
+	defer qu.Catch()
+	sess := util.MgoEB.GetMgoConn()
+	defer util.MgoEB.DestoryMongoConn(sess)
+	lock := &sync.Mutex{}
+	wg := &sync.WaitGroup{}
+	ch := make(chan bool, 5)
+	query := map[string]interface{}{
+		"platform": "golua平台",
+		"state": map[string]interface{}{
+			"$in": []int{0, 1, 2, 5}, //待完成、待审核、未通过、已上架
+		},
+		"event": map[string]interface{}{
+			"$ne": 7000,
+		},
+	}
+	fields := map[string]interface{}{
+		"event":   1,
+		"code":    1,
+		"site":    1,
+		"channel": 1,
+	}
+	count := util.MgoEB.Count("luaconfig", query)
+	logger.Debug("共加载线上爬虫个数:", count)
+	it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&fields).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			code := qu.ObjToString(tmp["code"])
+			site := qu.ObjToString(tmp["site"])
+			channel := qu.ObjToString(tmp["channel"])
+			event := qu.IntAll(tmp["event"])
+			lock.Lock()
+			if event != 7410 && event != 7700 {
+				if util.CodeEventType[event] == 1 {
+					EventMapType1[event]++
+				} else if util.CodeEventType[event] == 2 {
+					EventMapType2[event]++
+				} else if util.CodeEventType[event] == 3 {
+					EventMapType3[event]++
+				}
+			}
+			CodeMap[code] = &Spider{
+				Code:       code,
+				Site:       site,
+				Channel:    channel,
+				FromEvent:  event,
+				Comeintime: time.Now().Unix(),
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			logger.Debug(n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	//排序,选出数量最少的节点
+	for event, num := range EventMapType1 {
+		EventArrType1 = append(EventArrType1, &EventNum{
+			Event: event,
+			Num:   num,
+		})
+	}
+	sort.Slice(EventArrType1, func(i, j int) bool {
+		return EventArrType1[i].Num < EventArrType1[j].Num // 升序
+	})
+	for event, num := range EventMapType2 {
+		EventArrType2 = append(EventArrType2, &EventNum{
+			Event: event,
+			Num:   num,
+		})
+	}
+	sort.Slice(EventArrType2, func(i, j int) bool {
+		return EventArrType2[i].Num < EventArrType2[j].Num // 升序
+	})
+	for event, num := range EventMapType3 {
+		EventArrType3 = append(EventArrType3, &EventNum{
+			Event: event,
+			Num:   num,
+		})
+	}
+	sort.Slice(EventArrType3, func(i, j int) bool {
+		return EventArrType3[i].Num < EventArrType3[j].Num // 升序
+	})
+	logger.Debug("爬虫基本信息准备完成...", EventArrType1[0].Event, EventArrType2[0].Event, EventArrType3[0].Event)
+}
+
+func GetDataNum() {
+	defer qu.Catch()
+	sess := util.MgoEB.GetMgoConn()
+	defer util.MgoEB.DestoryMongoConn(sess)
+	lock := &sync.Mutex{}
+	wg := &sync.WaitGroup{}
+	ch := make(chan bool, 5)
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": util.GetTime(-30),
+			"$lt":  util.GetTime(0),
+		},
+	}
+	fieles := map[string]interface{}{
+		"spidercode":     1,
+		"ptimeallnum":    1, //按发布时间统计的每天的采集量
+		"downloadallnum": 1, //每天的采集量
+	}
+	it := sess.DB(util.MgoEB.DbName).C("luacodeinfo").Find(&query).Select(&fieles).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		wg.Add(1)
+		ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			spidercode := qu.ObjToString(tmp["spidercode"])
+			ptimeallnum := qu.IntAll(tmp["ptimeallnum"])
+			downloadallnum := qu.IntAll(tmp["downloadallnum"])
+			lock.Lock()
+			if sp := CodeMap[spidercode]; sp != nil {
+				sp.DataNum += downloadallnum
+				sp.PtimeDataNum += ptimeallnum
+			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			logger.Debug(n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	logger.Debug("爬虫一个月数据量统计完毕...")
+}
+
+func GetMoveLua() {
+	defer qu.Catch()
+	/*
+		1、按入库时间采集的数据量统计
+		2、转移节点规则
+			转移至高频率模式(7100、7110、7400、7410):30天采集总量超过200条
+			转移至队低频率列模式(7200、7210、7300、7310):30天采集总量50-200条
+			转移至极低频率模式(7500、7510、7700):30天采集总量0-50条
+			特殊节点(7520):0条
+	*/
+	save := []map[string]interface{}{}
+	for _, sp := range CodeMap {
+		if sp.DataNum >= 200 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 1 { //本身不是高性能节点超过200条的
+			sp.IsMove = true
+			if sp.FromEvent == 7700 { //7700、7410节点特殊性
+				sp.ToEvent = 7410
+			} else {
+				sp.ToEvent = EventArrType1[0].Event
+			}
+		} else if sp.DataNum >= 10 && sp.DataNum < 200 && sp.FromEvent != 7700 && sp.FromEvent != 7410 && util.CodeEventType[sp.FromEvent] != 2 {
+			sp.IsMove = true
+			sp.ToEvent = EventArrType2[0].Event
+		} else if sp.DataNum > 0 && sp.DataNum < 10 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 3 {
+			sp.IsMove = true
+			if sp.FromEvent == 7410 { //7700、7410节点特殊性
+				sp.ToEvent = 7700
+			} else {
+				sp.ToEvent = EventArrType3[0].Event
+			}
+		} else if sp.DataNum == 0 && sp.FromEvent != 7700 && util.CodeEventType[sp.FromEvent] != 4 {
+			sp.IsMove = true
+			if sp.FromEvent == 7410 { //7700、7410节点特殊性
+				sp.ToEvent = 7700
+			} else {
+				sp.ToEvent = 7520
+			}
+		}
+		//存储爬虫统计信息
+		byteText, err := json.Marshal(sp)
+		if err != nil {
+			logger.Debug("Json Marshal Error", sp.Code)
+			continue
+		}
+		tmp := map[string]interface{}{}
+		if json.Unmarshal(byteText, &tmp) == nil {
+			save = append(save, tmp)
+			if len(save) >= 1000 {
+				util.MgoEB.SaveBulk("luamovevent", save...)
+				save = []map[string]interface{}{}
+			}
+		}
+	}
+	if len(save) > 0 {
+		util.MgoEB.SaveBulk("luamovevent", save...)
+		save = []map[string]interface{}{}
+	}
+}

+ 107 - 45
src/timetask/random.go

@@ -3,6 +3,7 @@ package timetask
 import (
 	qu "qfw/util"
 	"strings"
+	"sync"
 	"time"
 	"util"
 )
@@ -14,7 +15,7 @@ type WarnInfo struct {
 	Site     interface{}
 	Channel  interface{}
 	Title    interface{}
-	Info     interface{}
+	Infos    map[string]bool
 	Code     interface{}
 	Href     interface{}
 }
@@ -22,6 +23,8 @@ type WarnInfo struct {
 func GetSpiderWarnData() {
 	defer qu.Catch()
 	qu.Debug("准备spider_warn_err数据")
+	sess := util.MgoS.GetMgoConn()
+	defer util.MgoS.DestoryMongoConn(sess)
 	stime := util.GetTime(-1)
 	etime := util.GetTime(0)
 	if time.Now().Weekday().String() == "Monday" {
@@ -36,57 +39,116 @@ func GetSpiderWarnData() {
 			"$in": []string{"Html Contains Temp Language", "Field Value Contains Random Code", "Publishtime Is Too Early", "Publishtime Is Too Late", "Field Value Not Contains Chinese"},
 		},
 	}
-	tmp := map[string]*WarnInfo{}
-	list, _ := util.MgoS.Find("spider_warn", query, nil, nil, false, -1, -1)
-	qu.Debug("query:", query, len(*list))
-	for _, l := range *list {
-		href := qu.ObjToString(l["href"])
-		level := qu.IntAll(l["level"])
-		field := qu.ObjToString(l["field"])
-		if field == "publishtime" { //特殊处理publishtime字段的level(保存服务中publishtime异常数据入bidding库,level不能为2)
-			level = 1
-		}
-		if warnInfo := tmp[href]; warnInfo == nil {
-			warnInfo = &WarnInfo{
-				Fields:   map[string]bool{field: true},
-				MaxLevel: level,
-				Data:     l["data"],
-				Site:     l["site"],
-				Channel:  l["channel"],
-				Title:    l["title"],
-				Info:     l["info"],
-				Code:     l["code"],
-				Href:     href,
+	ch := make(chan bool, 2)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	result := map[string]*WarnInfo{}
+	it := sess.DB(util.MgoS.DbName).C("spider_warn").Find(&query).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			href := qu.ObjToString(tmp["href"])
+			level := qu.IntAll(tmp["level"])
+			field := qu.ObjToString(tmp["field"])
+			info := qu.ObjToString(tmp["info"])
+			title := qu.ObjToString(tmp["title"])
+			if info == "Field Value Not Contains Chinese" && RepeatData(title) > 0 { //数据验证,数据库已有title一致的数据不再推送
+				return
+			}
+			if field == "publishtime" { //特殊处理publishtime字段的level(保存服务中publishtime异常数据入bidding库,level不能为2)
+				level = 1
 			}
-			tmp[href] = warnInfo
-		} else {
-			warnInfo.Fields[field] = true
-			if warnInfo.MaxLevel < level {
-				warnInfo.MaxLevel = level
+			lock.Lock()
+			if warnInfo := result[href]; warnInfo == nil {
+				warnInfo = &WarnInfo{
+					Fields:   map[string]bool{field: true},
+					MaxLevel: level,
+					Data:     tmp["data"],
+					Site:     tmp["site"],
+					Channel:  tmp["channel"],
+					Title:    title,
+					Infos:    map[string]bool{info: true},
+					Code:     tmp["code"],
+					Href:     href,
+				}
+				result[href] = warnInfo
+			} else {
+				warnInfo.Fields[field] = true
+				warnInfo.Infos[info] = true
+				if warnInfo.MaxLevel < level {
+					warnInfo.MaxLevel = level
+				}
 			}
+			lock.Unlock()
+		}(tmp)
+		if n%1000 == 0 {
+			qu.Debug("current:", n)
 		}
+		tmp = map[string]interface{}{}
 	}
-	for _, wi := range tmp {
-		fields := []string{}
-		for f, _ := range wi.Fields {
-			fields = append(fields, f)
-		}
-		util.MgoS.Save("spider_warn_err", map[string]interface{}{
-			"field":      strings.Join(fields, ","),
-			"level":      wi.MaxLevel,
-			"site":       wi.Site,
-			"channel":    wi.Channel,
-			"title":      wi.Title,
-			"comeintime": time.Now().Unix(),
-			"info":       wi.Info,
-			"code":       wi.Code,
-			"href":       wi.Href,
-			"data":       wi.Data,
-			"ok":         false,
-		})
+	wg.Wait()
+	saveArr := []map[string]interface{}{}
+	for _, wi := range result {
+		ch <- true
+		wg.Add(1)
+		go func(w *WarnInfo) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			fields := []string{}
+			for f, _ := range w.Fields {
+				fields = append(fields, f)
+			}
+			infos := []string{}
+			for t, _ := range w.Infos {
+				infos = append(infos, t)
+			}
+			lock.Lock()
+			saveArr = append(saveArr, map[string]interface{}{
+				"field":      strings.Join(fields, ","),
+				"level":      w.MaxLevel,
+				"site":       w.Site,
+				"channel":    w.Channel,
+				"title":      w.Title,
+				"comeintime": time.Now().Unix(),
+				"info":       strings.Join(infos, ","),
+				"code":       w.Code,
+				"href":       w.Href,
+				"data":       w.Data,
+				"ok":         false,
+			})
+			if len(saveArr) > 500 {
+				util.MgoS.SaveBulk("spider_warn_err", saveArr...)
+				saveArr = []map[string]interface{}{}
+			}
+			lock.Unlock()
+		}(wi)
+	}
+	wg.Wait()
+	if len(saveArr) > 0 {
+		util.MgoS.SaveBulk("spider_warn_err", saveArr...)
+		saveArr = []map[string]interface{}{}
 	}
 }
 
+func RepeatData(title string) int {
+	return util.MgoB.Count("bidding",
+		map[string]interface{}{
+			"title": title,
+			"comeintime": map[string]interface{}{
+				"$gte": util.GetTime(-3),
+				"$lte": time.Now().Unix(),
+			},
+		})
+}
+
 /*
 	每天定时推送含乱码数据
 */

+ 8 - 5
src/timetask/report.go

@@ -433,12 +433,13 @@ func GetCodeAndSiteInfo() (int, int) {
 	lock := &sync.Mutex{}
 	query := map[string]interface{}{
 		"state": map[string]interface{}{
-			"$nin": []int{4, 10}, //查询非作废和非删除状态的爬虫
+			"$nin": []int{10}, //查询非作废和非删除状态的爬虫
 		},
 	}
 	field := map[string]interface{}{
-		"code": 1,
-		"site": 1,
+		"code":  1,
+		"site":  1,
+		"state": 1,
 	}
 	it := sess.DB(util.MgoEB.DbName).C("luaconfig").Find(&query).Select(&field).Iter()
 	n := 0
@@ -450,11 +451,13 @@ func GetCodeAndSiteInfo() (int, int) {
 				<-ch
 				wg.Done()
 			}()
-			code := qu.ObjToString(tmp["code"])
 			site := qu.ObjToString(tmp["site"])
+			code := qu.ObjToString(tmp["code"])
 			lock.Lock()
-			codeMap[code] = true
 			siteMap[site] = true
+			if state := qu.IntAll(tmp["state"]); state != 4 { //作废爬虫不统计爬虫总量中
+				codeMap[code] = true
+			}
 			lock.Unlock()
 		}(tmp)
 		if n%1000 == 0 {

+ 5 - 0
src/util/config.go

@@ -30,10 +30,12 @@ var (
 	FileWarnCron               string        //每天统计附件异常数据
 	MoveListDataCron           string        //迁移spider_highlistdata、spider_listdata数据
 	SpiderWeeklyReportCron     string        //周报统计
+	LuamoveCron                string        //每月1日统计要转移节点的爬虫
 	CloseNum                   int           //关闭几天的任务
 	DayNum                     int           //更新数据天数
 	CodeEventModel             map[int]int   //节点对应的采集模式0:老模式;1:新模式
 	CodeEventWorking           map[int]int   //节点对应的采集模式0:高性能模式;1:队列模式
+	CodeEventType              map[int]int   //节点对应的不同类型的采集频率
 	GMail                      *gm.GmailAuth //邮件信息
 	To                         string        //邮件接收人
 
@@ -104,6 +106,7 @@ func InitOther() {
 	FileWarnCron = qu.ObjToString(Config["filewarncron"])
 	MoveListDataCron = qu.ObjToString(Config["movelistdatacron"])
 	SpiderWeeklyReportCron = qu.ObjToString(Config["spiderweeklyreportcron"])
+	LuamoveCron = qu.ObjToString(Config["luamovecron"])
 
 	CloseNum = qu.IntAll(Config["closenum"])
 	DayNum = qu.IntAll(Config["daynum"])
@@ -115,12 +118,14 @@ func InitOther() {
 	UploadEvents = map[int]string{}
 	CodeEventModel = map[int]int{}
 	CodeEventWorking = map[int]int{}
+	CodeEventType = map[int]int{}
 	for event, info := range eventsinfo {
 		eventTmp := qu.IntAll(event)
 		infoMap := info.(map[string]interface{})
 		UploadEvents[eventTmp] = qu.ObjToString(infoMap["server"])
 		CodeEventModel[eventTmp] = qu.IntAll(infoMap["model"])
 		CodeEventWorking[eventTmp] = qu.IntAll(infoMap["work"])
+		CodeEventType[eventTmp] = qu.IntAll(infoMap["type"])
 	}
 	//qu.Debug(UploadEvents, CodeEventModel, CodeEventWorking)
 	//mail