瀏覽代碼

新增附件异常任务

maxiaoshan 2 年之前
父節點
當前提交
754f889b89
共有 5 個文件被更改,包括 135 次插入50 次删除
  1. 5 5
      src/luatask/sitecount.go
  2. 116 25
      src/luatask/task.go
  3. 13 11
      src/timetask/report.go
  4. 1 1
      src/timetask/wxworkwarn.go
  5. 0 8
      src/util/config.go

+ 5 - 5
src/luatask/sitecount.go

@@ -77,7 +77,7 @@ func SendInfoToWxWork_SiteDataCount() {
 	//siteInfoMap := map[string]*SiteInfo{}
 	//siteInfoMap_Back := map[string]*SiteInfo{}
 	allSpiderMap := map[string]*SiteInfo{}
-	list, _ := util.MgoE.Find("site_baseinfo", nil, nil, nil, false, -1, -1)
+	list, _ := util.MgoEB.Find("site_baseinfo", nil, nil, nil, false, -1, -1)
 	for _, l := range *list {
 		site := qu.ObjToString(l["site"])
 		vByte, _ := json.Marshal(l)
@@ -115,8 +115,8 @@ func SendInfoToWxWork_SiteDataCount() {
 func GetAllSpidercodeNum(siteInfoMap map[string]*SiteInfo) {
 	defer qu.Catch()
 	logger.Info("统计采集量luacodeinfo开始...")
-	sess := util.MgoE.GetMgoConn()
-	defer util.MgoE.DestoryMongoConn(sess)
+	sess := util.MgoEB.GetMgoConn()
+	defer util.MgoEB.DestoryMongoConn(sess)
 	query := map[string]interface{}{
 		"comeintime": map[string]interface{}{
 			"$gte": util.GetTime(0),
@@ -133,7 +133,7 @@ func GetAllSpidercodeNum(siteInfoMap map[string]*SiteInfo) {
 	lock := &sync.Mutex{}
 	wg := &sync.WaitGroup{}
 	ch := make(chan bool, 5)
-	it := sess.DB(util.MgoE.DbName).C("luacodeinfo").Find(&query).Select(&fields).Iter()
+	it := sess.DB(util.MgoEB.DbName).C("luacodeinfo").Find(&query).Select(&fields).Iter()
 	n := 0
 	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
 		wg.Add(1)
@@ -431,7 +431,7 @@ func GetSiteInfoExcel(allSpiderInfo map[string]*SiteInfo, day string) {
 			logger.Info("json marshal:", err)
 		}
 	}
-	util.MgoE.SaveBulk("site_datacount", arr...)
+	util.MgoEB.SaveBulk("site_datacount", arr...)
 	arr = []map[string]interface{}{}
 	//file.Save("res/tmp.xlsx")
 	SendSiteInfoToWxWork(file)

+ 116 - 25
src/luatask/task.go

@@ -14,8 +14,9 @@ import (
 	"github.com/donnie4w/go-logger/logger"
 )
 
-//采集频率异常、列表页异常、404异常、下载异常、运行异常、时间异常、数据异常
-const TASK_RATEERR, TASK_LISTERR, TASK_404ERR, TASK_DOWNLOADERR, TASK_RUNERR, TASK_TIMEERR, TASK_DATAERR = 8, 7, 6, 5, 4, 3, 2
+//原来的404异常替换为附件异常TASK_ANNEXERR
+//采集频率异常、列表页异常、附件异常、下载异常、运行异常、时间异常、数据异常
+const TASK_RATEERR, TASK_LISTERR, TASK_ANNEXERR, TASK_DOWNLOADERR, TASK_RUNERR, TASK_TIMEERR, TASK_DATAERR = 8, 7, 6, 5, 4, 3, 2
 
 //失败占比
 const FailedPercentLimit = 0.20
@@ -37,10 +38,10 @@ var StateFeedBackErr = map[int]string{
 }
 
 var PythonErrTypeInfoMap = map[string]ErrTypeInfo{
-	"download": ErrTypeInfo{
-		ErrType: TASK_404ERR,
-		Remark:  "下载异常",
-	},
+	//"download": ErrTypeInfo{
+	//	ErrType: TASK_404ERR,
+	//	Remark:  "下载异常",
+	//},
 	"server": ErrTypeInfo{
 		ErrType: TASK_DOWNLOADERR,
 		Remark:  "服务异常",
@@ -55,6 +56,10 @@ var PythonErrTypeInfoMap = map[string]ErrTypeInfo{
 	},
 }
 var LuaErrTypeInfoMap = map[string]ErrTypeInfo{
+	"annex": {
+		ErrType: TASK_ANNEXERR,
+		Remark:  "附件异常",
+	},
 	"download": ErrTypeInfo{
 		ErrType: TASK_DOWNLOADERR,
 		Remark:  "下载异常",
@@ -180,6 +185,7 @@ func StartTask() {
 	logger.Debug(StartTime, EndTime, Publishtime)
 	GetCodeBaseInfo()              //初始化爬虫基本信息
 	GetBiddingCount()              //统计bidding表爬虫采集量
+	GetBiddingFileData()           //统计bidding_file附件大小为3.7 KB的信息
 	GetCodeHeart()                 //初始化爬虫心跳信息
 	GetSpiderHighListDownloadNum() //统计spider_highlistdata爬虫列表页下载量、下载失败量、未下载量
 	GetSpiderListDownloadNum()     //统计spider_listdata爬虫列表页下载量、下载失败量、未下载量
@@ -1120,15 +1126,15 @@ func GetSpiderWarnErrData() {
 				if flag { //列入下载异常任务类型
 					oneErrInfo := &ErrRemark{
 						Href:   href,
-						Remark: "Download Failed",
+						Remark: "File Err",
 					}
 					lock.Lock()
 					if spider := CodeInfoMap[code]; spider != nil {
-						if errMap := spider.Error["download"]; errMap != nil {
+						if errMap := spider.Error["annex"]; errMap != nil {
 							errMap.Num += 1 //叠加某种异常stype的数量
 							errMap.Err = append(errMap.Err, oneErrInfo)
 						} else {
-							spider.Error["download"] = &ErrorInfo{
+							spider.Error["annex"] = &ErrorInfo{
 								Num: 1,
 								Err: []*ErrRemark{
 									oneErrInfo,
@@ -1149,6 +1155,83 @@ func GetSpiderWarnErrData() {
 	logger.Debug("错误信息数据统计完成...")
 }
 
+func GetBiddingFileData() {
+	logger.Debug("附件信息数据统计...")
+	defer qu.Catch()
+	sess := util.MgoB.GetMgoConn()
+	defer util.MgoB.DestoryMongoConn(sess)
+	lock := &sync.Mutex{}
+	wg := &sync.WaitGroup{}
+	ch := make(chan bool, 5)
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$gte": StartTime,
+			"$lt":  EndTime,
+		},
+	}
+	fieles := map[string]interface{}{
+		"spidercode":  1,
+		"projectinfo": 1,
+		"href":        1,
+		"biddingid":   1,
+	}
+	count := util.MgoB.Count("bidding_file", query)
+	logger.Debug("bidding_file数据量:", count)
+	it := sess.DB(util.MgoB.DbName).C("bidding_file").Find(&query).Select(&fieles).Iter()
+	n := 0
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		wg.Add(1)
+		ch <- true
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			ok := true
+			if projectinfo, ok := tmp["projectinfo"].(map[string]interface{}); ok {
+				if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok {
+					for _, attachment := range attachments {
+						result, _ := attachment.(map[string]interface{})
+						if size := qu.ObjToString(result["size"]); size == "3.7 KB" {
+							ok = false
+							break
+						}
+					}
+				}
+			}
+			if !ok {
+				code := qu.ObjToString(tmp["spidercode"])
+				href := qu.ObjToString(tmp["href"])
+				oneErrInfo := &ErrRemark{
+					Href:   href,
+					Remark: "File Err",
+				}
+				lock.Lock()
+				if spider := CodeInfoMap[code]; spider != nil {
+					if errMap := spider.Error["annex"]; errMap != nil { //附件异常
+						errMap.Num += 1 //叠加某种异常stype的数量
+						errMap.Err = append(errMap.Err, oneErrInfo)
+					} else {
+						spider.Error["annex"] = &ErrorInfo{
+							Num: 1,
+							Err: []*ErrRemark{
+								oneErrInfo,
+							},
+						}
+					}
+				}
+				lock.Unlock()
+			}
+		}(tmp)
+		if n%1000 == 0 {
+			logger.Debug(n)
+		}
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	logger.Debug("附件信息数据统计完成...")
+}
+
 //汇总lua错误信息数据
 func GetSpiderWarnErrData_back() {
 	defer qu.Catch()
@@ -1731,6 +1814,8 @@ func CreateTaskProcess() {
 							} else if spider.Model == 0 && spider.Working == 0 { //老模式,高性能模式(7410)不建下载异常任务
 								continue
 							}
+						} else if stype == "annex" { //附件异常直接建待处理任务
+							task.State = 1 //待处理
 						}
 						//取最大的错误异常类型
 						if task.ErrType < info.ErrType {
@@ -1845,11 +1930,11 @@ func CreateTaskProcess() {
 
 			lock.Lock()
 			if len(arr) > 500 {
-				util.MgoE.SaveBulk("luacodeinfo", arr...)
+				util.MgoEB.SaveBulk("luacodeinfo", arr...)
 				arr = []map[string]interface{}{}
 			}
 			if len(upsertBulk) > 500 {
-				util.MgoE.UpSertBulk("task", upsertBulk...)
+				util.MgoEB.UpSertBulk("task", upsertBulk...)
 				upsertBulk = [][]map[string]interface{}{}
 			}
 			lock.Unlock()
@@ -1858,11 +1943,11 @@ func CreateTaskProcess() {
 	wg.Wait()
 	lock.Lock()
 	if len(arr) > 0 {
-		util.MgoE.SaveBulk("luacodeinfo", arr...)
+		util.MgoEB.SaveBulk("luacodeinfo", arr...)
 		arr = []map[string]interface{}{}
 	}
 	if len(upsertBulk) > 0 {
-		util.MgoE.UpSertBulk("task", upsertBulk...)
+		util.MgoEB.UpSertBulk("task", upsertBulk...)
 		upsertBulk = [][]map[string]interface{}{}
 	}
 	lock.Unlock()
@@ -1896,11 +1981,11 @@ func CreateTask(t *Task, sp *Spider, upsertBulk *[][]map[string]interface{}, loc
 			logger.Debug("更新挂起状态爬虫:", sp.Code)
 		}
 	}
-	diff := time.Now().Unix() - sp.AuditTime
-	if sp.State == 5 && diff <= 86400 { //已上架爬虫且爬虫最新一次提交审核时间小于24小时,不建任务
-		logger.Debug("该爬虫近期维护无需新建任务:", sp.Code)
-		return
-	}
+	//diff := time.Now().Unix() - sp.AuditTime
+	//if sp.State == 5 && diff <= 86400 { //已上架爬虫且爬虫最新一次提交审核时间小于24小时,不建任务
+	//	logger.Debug("该爬虫近期维护无需新建任务:", sp.Code)
+	//	return
+	//}
 	descript_new := "" //新任务的异常描述
 	for _, text := range t.DescribeMap {
 		descript_new += text
@@ -1918,12 +2003,12 @@ func CreateTask(t *Task, sp *Spider, upsertBulk *[][]map[string]interface{}, loc
 		"i_times":    1,
 		"s_urgency":  1,
 	}
-	list, _ := util.MgoE.Find("task", query, nil, fields, false, -1, -1)
+	list, _ := util.MgoEB.Find("task", query, nil, fields, false, -1, -1)
 	update := []map[string]interface{}{}
 	if list != nil && len(*list) > 0 { //已有任务
 		if len(*list) > 1 {
 			logger.Error("Code:", sp.Code, "任务异常")
-			util.MgoE.Save("luacreatetaskerr", map[string]interface{}{
+			util.MgoEB.Save("luacreatetaskerr", map[string]interface{}{
 				"code":       sp.Code,
 				"comeintime": time.Now().Unix(),
 				"tasknum":    len(*list),
@@ -1982,6 +2067,12 @@ func CreateTask(t *Task, sp *Spider, upsertBulk *[][]map[string]interface{}, loc
 		//if t.State == 1 { //待处理times=1
 		//	times = 1
 		//}
+		diff := time.Now().Unix() - sp.AuditTime
+		//已上架爬虫,审核时间小于24小时,此次任务不为下载异常类型,不建任务
+		if t.ErrType != 5 && sp.State == 5 && diff <= 86400 {
+			logger.Debug("该爬虫近期维护无需新建任务:", sp.Code)
+			return
+		}
 		saveMap := map[string]interface{}{
 			"s_modify":     sp.ModifyUser,
 			"s_modifyid":   sp.ModifyId,
@@ -2024,7 +2115,7 @@ func UpdateLuaInfo(sp *Spider) {
 	logger.Debug("Code:", sp.Code, "	", sp.FrequencyErrTimes)
 	b := util.MgoEB.Update("luaconfig", map[string]interface{}{"code": sp.Code}, map[string]interface{}{"$set": set}, false, false)
 	if b && sp.FrequencyErrTimes <= 3 { //FrequencyErrTimes>3时会建采集频率异常的待处理任务,不再上下架
-		//爬虫下架、上
+		//爬虫下架、上
 		qu.Debug("爬虫上下架 code:", sp.Code)
 		CodeLock.Lock()
 		ok, err := util.UpdateSpiderByCodeState(sp.Code, "6", sp.Event) //下架
@@ -2217,7 +2308,7 @@ func CloseTask() {
 			"i_state": 6,
 		},
 	}
-	util.MgoE.Update("task", query, set, false, true)
+	util.MgoEB.Update("task", query, set, false, true)
 	logger.Debug("---清理未更新任务完毕---")
 }
 
@@ -2264,7 +2355,7 @@ func SaveCodeInfo() {
 			}
 			lock.Lock()
 			if len(arr) > 500 {
-				util.MgoE.SaveBulk("luacodeinfo_back", arr...)
+				util.MgoEB.SaveBulk("luacodeinfo_back", arr...)
 				arr = []map[string]interface{}{}
 			}
 			lock.Unlock()
@@ -2272,7 +2363,7 @@ func SaveCodeInfo() {
 	}
 	wg.Wait()
 	if len(arr) > 0 {
-		util.MgoE.SaveBulk("luacodeinfo_back", arr...)
+		util.MgoEB.SaveBulk("luacodeinfo_back", arr...)
 		arr = []map[string]interface{}{}
 	}
 	logger.Debug("爬虫基本信息生成完成...")
@@ -2287,7 +2378,7 @@ func SaveUserCreateTaskNum() {
 		for s, n := range sn {
 			save[s] = n
 		}
-		util.MgoE.Save("luausertask", save)
+		util.MgoEB.Save("luausertask", save)
 	}
 	UserTaskNum = map[string]map[string]int{}
 }

+ 13 - 11
src/timetask/report.go

@@ -46,7 +46,7 @@ func SpiderWeeklyReport() {
 			"$lt":  eTime,
 		},
 	}
-	ThisWeekAddTaskNum := util.MgoE.Count("task", query)
+	ThisWeekAddTaskNum := util.MgoEB.Count("task", query)
 	qu.Debug("本周新建任务数量:", ThisWeekAddTaskNum)
 	//新建任务同比增减
 	AddTaskIncDecRatio := float64(0)
@@ -67,7 +67,7 @@ func SpiderWeeklyReport() {
 			"$gte": 2, //任务状态:处理中、待审核、审核通过、未通过、关闭
 		},
 	}
-	ThisWeekCheckTaskNum := util.MgoE.Count("task", query)
+	ThisWeekCheckTaskNum := util.MgoEB.Count("task", query)
 	qu.Debug("本周核实任务数量:", ThisWeekCheckTaskNum)
 	//核实任务同比增减
 	CheckTaskIncDecRatio := float64(0)
@@ -83,7 +83,7 @@ func SpiderWeeklyReport() {
 		//},
 		"i_state": 1, //待处理
 	}
-	NeedToCheckTaskAllNum := util.MgoE.Count("task", query)
+	NeedToCheckTaskAllNum := util.MgoEB.Count("task", query)
 	qu.Debug("待核实任务总数:", NeedToCheckTaskAllNum)
 
 	//2、lua新增爬虫
@@ -115,7 +115,7 @@ func SpiderWeeklyReport() {
 			"$lt":  eTime,
 		},
 	}
-	ThisWeekFinishLuaNum := util.MgoE.Count("lua_logs_auditor_new", query)
+	ThisWeekFinishLuaNum := util.MgoEB.Count("lua_logs_auditor_new", query)
 	qu.Debug("lua本周已完成新建爬虫数量:", ThisWeekFinishLuaNum)
 	//lua完成新建爬虫同比增减
 	FinishLuaIncDecRatio := float64(0)
@@ -149,7 +149,7 @@ func SpiderWeeklyReport() {
 			"$lte": 5,
 		},
 	}
-	ThisWeekAddEffectTaskNum := util.MgoE.Count("task", query)
+	ThisWeekAddEffectTaskNum := util.MgoEB.Count("task", query)
 	qu.Debug("本周新增待维护任务数量:", ThisWeekAddEffectTaskNum)
 	//新建待维护任务同比增减
 	AddEffectTaskIncDecRatio := float64(0)
@@ -168,7 +168,7 @@ func SpiderWeeklyReport() {
 		},
 		"i_state": 4, //审核通过
 	}
-	ThisWeekFinishEffectTaskNum := util.MgoE.Count("task", query)
+	ThisWeekFinishEffectTaskNum := util.MgoEB.Count("task", query)
 	qu.Debug("本周完成待维护任务数量:", ThisWeekFinishEffectTaskNum)
 	//完成待维护任务同比增减
 	FinishEffectTaskIncDecRatio := float64(0)
@@ -183,7 +183,7 @@ func SpiderWeeklyReport() {
 			"$in": []int{2, 3, 5}, //处理中、待审核、未通过
 		},
 	}
-	NeedToFinishEffectTaskAllNum := util.MgoE.Count("task", query)
+	NeedToFinishEffectTaskAllNum := util.MgoEB.Count("task", query)
 	qu.Debug("待维护任务总数:", NeedToFinishEffectTaskAllNum)
 
 	//4、python爬虫
@@ -217,7 +217,7 @@ func SpiderWeeklyReport() {
 		"platform": "python",
 		"state":    11,
 	}
-	ThisWeekFinishPythonNum := util.MgoE.Count("luaconfig", query)
+	ThisWeekFinishPythonNum := util.MgoEB.Count("luaconfig", query)
 	qu.Debug("python本周已完成爬虫数量:", ThisWeekFinishPythonNum)
 	//已完成爬虫数量同比增减
 	FinishPythonIncDecRatio := float64(0)
@@ -237,7 +237,9 @@ func SpiderWeeklyReport() {
 			},
 			map[string]interface{}{
 				"platform": "golua平台",
-				"state":    9, //lua平台无法处理(转python)的爬虫算到python待完成的爬虫里
+				"state": map[string]interface{}{
+					"$in": []int{8, 9}, //需登录、转python状态的爬虫,记录在python待完成爬虫数量中
+				},
 			},
 		},
 	}
@@ -477,7 +479,7 @@ func GetThisWeekCompeteMgoNum(sTime, eTime int64) (ThisWeekCompeteMgoNum int) {
 	defer qu.Catch()
 	sess := util.MgoB.GetMgoConn()
 	defer util.MgoB.DestoryMongoConn(sess)
-	ch := make(chan bool, 2)
+	ch := make(chan bool, 5)
 	wg := &sync.WaitGroup{}
 	lock := &sync.Mutex{}
 	query := map[string]interface{}{
@@ -561,7 +563,7 @@ func CountNeedToFinishNewLuaAllNum(query map[string]interface{}) (result int) {
 				wg.Done()
 			}()
 			code := qu.ObjToString(tmp["code"])
-			count := util.MgoE.Count("lua_logs_auditor", map[string]interface{}{"code": code, "types": "审核"})
+			count := util.MgoEB.Count("lua_logs_auditor", map[string]interface{}{"code": code, "types": "审核"})
 			if count == 0 { //无审核记录表示新爬虫
 				lock.Lock()
 				result++

+ 1 - 1
src/timetask/wxworkwarn.go

@@ -126,7 +126,7 @@ func SendLuaInfo() {
 		textInfo := &LuaUserTextInfo{}
 		textInfo.Username = userInfo.Username
 		//1、未通过任务信息
-		list_task, _ := util.MgoE.Find("task",
+		list_task, _ := util.MgoEB.Find("task",
 			map[string]interface{}{"s_modify": user, "i_state": 5},
 			map[string]interface{}{"l_complete": 1},
 			map[string]interface{}{"l_complete": 1},

+ 0 - 8
src/util/config.go

@@ -10,7 +10,6 @@ import (
 
 var (
 	Config                     map[string]interface{}
-	MgoE                       *mgo.MongodbSim //editor
 	MgoEB                      *mgo.MongodbSim //editor
 	MgoS                       *mgo.MongodbSim //spider
 	MgoPy                      *mgo.MongodbSim //py_spider
@@ -57,13 +56,6 @@ func InitMgo() {
 		Size:        qu.IntAll(pyspider["size"]),
 	}
 	MgoPy.InitPool()
-	editor := Config["editor"].(map[string]interface{})
-	MgoE = &mgo.MongodbSim{
-		MongodbAddr: qu.ObjToString(editor["addr"]),
-		DbName:      qu.ObjToString(editor["db"]),
-		Size:        qu.IntAll(editor["size"]),
-	}
-	MgoE.InitPool()
 	bidding := Config["bidding"].(map[string]interface{})
 	MgoB = &mgo.MongodbSim{
 		MongodbAddr: qu.ObjToString(bidding["addr"]),