浏览代码

爬虫认领新增过期时间字段

mxs 1 天之前
父节点
当前提交
30b761a3a1
共有 8 个文件被更改,包括 77 次插入61 次删除
  1. 4 0
      src/front/claim.go
  2. 49 40
      src/front/front.go
  3. 3 3
      src/front/luamove.go
  4. 6 6
      src/front/spider.go
  5. 1 1
      src/luaerrdata/errdata.go
  6. 1 1
      src/luaerrdata/errlua.go
  7. 10 8
      src/spider/service.go
  8. 3 2
      src/timetask/timetask.go

+ 4 - 0
src/front/claim.go

@@ -26,6 +26,7 @@ var (
 		"claimtime":       1,
 		"claimtype":       1,
 		"recovertime":     1,
+		"expiretime":      1,
 		//"grade":           1,
 	}
 )
@@ -164,6 +165,7 @@ func (f *Front) ReturnCode() {
 				"comeintime":       now,
 				"claimtime":        l["claimtime"],
 				"recovertime":      l["recovertime"],
+				"expiretime":       l["expiretime"],
 				"returntime":       now,
 				"important":        l["spiderimportant"],
 				"returnreason":     returnreason,
@@ -204,6 +206,7 @@ func UpdateCodeAndSaveLog(lua []map[string]interface{}, f *Front) {
 				"claimtime":       now,
 				"state":           Sp_state_0, //可视化平台退回爬虫状态为:无法标注12
 				"recovertime":     recovertime,
+				"expiretime":      recovertime,
 			}},
 		}
 		//认领爬虫时,将任务一并带走
@@ -231,6 +234,7 @@ func UpdateCodeAndSaveLog(lua []map[string]interface{}, f *Front) {
 			"comeintime":       now,
 			"claimtime":        now,
 			"recovertime":      recovertime,
+			"expiretime":       recovertime,
 			"returntime":       int64(0),
 			"important":        spiderimportant,
 			"returnreason":     "",

+ 49 - 40
src/front/front.go

@@ -1080,6 +1080,7 @@ func (f *Front) Assign() {
 				"claimtype":       CLAIMTYPECLAIMED,
 				"claimtime":       now,
 				"recovertime":     recovertime,
+				"expiretime":      recovertime,
 				"createuseremail": (*user)["s_email"],
 				"createuser":      name,
 				"createuserid":    userid,
@@ -1087,7 +1088,7 @@ func (f *Front) Assign() {
 				"modifyuserid":    userid,
 				//"platform":        platform,
 			}
-			if claimtype == CLAIMTYPECLAIMED { //已认领的爬虫增加回收日志
+			if claimtype >= CLAIMTYPECLAIMED { //已认领的爬虫增加回收日志
 				//回收日志
 				recovelog := map[string]interface{}{
 					"site":             l["site"],
@@ -1119,6 +1120,7 @@ func (f *Front) Assign() {
 				"comeintime":       now,
 				"claimtime":        now,
 				"recovertime":      recovertime,
+				"expiretime":       recovertime,
 				"returntime":       int64(0),
 				"important":        spiderimportant,
 				"returnreason":     "",
@@ -1176,11 +1178,11 @@ func (f *Front) SpiderUpdatePlatform() {
 			b = u.MgoS.Update("spider_heart", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"del": true}}, false, true)
 			pf := qu.ObjToString(l["platform"])
 			if pf == "golua平台" || pf == "chrome" { //爬虫所属golua平台
-				b, err = spider.UpdateSpiderByCodeState(code, "6", qu.IntAll(l["event"])) //下架
+				b, err = spider.UpdateSpiderByCodeState(code, "6", qu.IntAll(l["event"]), false) //下架
 				if b && err == nil {
 					//历史节点下架为了避免线上运行爬虫待完成时改为7000采集历史,但是又转到其他平台,导致原线上运行节点爬虫并未下线,心跳异常
 					if incrementevent := l["incrementevent"]; incrementevent != nil {
-						b, err = spider.UpdateSpiderByCodeState(code, "6", qu.IntAll(incrementevent))
+						b, err = spider.UpdateSpiderByCodeState(code, "6", qu.IntAll(incrementevent), false)
 					}
 				}
 			}
@@ -1199,26 +1201,29 @@ func (f *Front) SpiderUpdatePlatform() {
 		set["claimtype"] = CLAIMTYPEUNCLAIMED
 		set["claimtime"] = int64(0)
 		set["recovertime"] = int64(0)
-		//换平台爬虫回收,保存日志
-		recovelog := map[string]interface{}{
-			"site":             l["site"],
-			"code":             l["code"],
-			"channel":          l["channel"],
-			"modifyuser":       l["modifyuser"],
-			"priority":         l["priority"],
-			"stype":            "回收",
-			"comeintime":       time.Now().Unix(),
-			"claimtime":        l["claimtime"],
-			"recovertime":      l["recovertime"],
-			"returntime":       int64(0),
-			"important":        l["spiderimportant"],
-			"returnreason":     "转平台",
-			"claimrecovertype": 0,
-			"source":           "爬虫转平台回收",
+		if qu.IntAll(l["claimtype"]) > CLAIMTYPEUNCLAIMED {
+			//换平台爬虫回收,保存日志
+			recovelog := map[string]interface{}{
+				"site":             l["site"],
+				"code":             l["code"],
+				"channel":          l["channel"],
+				"modifyuser":       l["modifyuser"],
+				"priority":         l["priority"],
+				"stype":            "回收",
+				"comeintime":       time.Now().Unix(),
+				"claimtime":        l["claimtime"],
+				"recovertime":      l["recovertime"],
+				"expiretime":       l["expiretime"],
+				"returntime":       int64(0),
+				"important":        l["spiderimportant"],
+				"returnreason":     "转平台",
+				"claimrecovertype": 0,
+				"source":           "爬虫转平台回收",
+			}
+			save = append(save, recovelog)
 		}
-		save = append(save, recovelog)
-		update = append(update, map[string]interface{}{"$set": set})
 		arr = append(arr, update)
+		update = append(update, map[string]interface{}{"$set": set})
 	}
 	u.MgoEB.UpdateBulk("luaconfig", arr...)
 	if len(save) > 0 {
@@ -1421,11 +1426,11 @@ func (f *Front) UpdateESP() {
 			var err error
 			pf := qu.ObjToString((*one)["platform"])
 			if pf == "golua平台" || pf == "chrome" { //爬虫原平台所属golua平台,进行下架
-				b, err = spider.UpdateSpiderByCodeState(code, "6", qu.IntAll((*one)["event"])) //下架
+				b, err = spider.UpdateSpiderByCodeState(code, "6", qu.IntAll((*one)["event"]), false) //下架
 				if b && err == nil {
 					//历史节点下架为了避免线上运行爬虫待完成时改为7000采集历史,但是又转到其他平台,导致原线上运行节点爬虫并未下线,心跳异常
 					if incrementevent := (*one)["incrementevent"]; incrementevent != nil {
-						b, err = spider.UpdateSpiderByCodeState(code, "6", qu.IntAll(incrementevent))
+						b, err = spider.UpdateSpiderByCodeState(code, "6", qu.IntAll(incrementevent), false)
 					}
 				}
 			}
@@ -1442,24 +1447,28 @@ func (f *Front) UpdateESP() {
 		set["claimtype"] = CLAIMTYPEUNCLAIMED
 		set["claimtime"] = int64(0)
 		set["recovertime"] = int64(0)
-		//换平台爬虫回收,保存日志
-		recovelog := map[string]interface{}{
-			"site":             (*one)["site"],
-			"code":             (*one)["code"],
-			"channel":          (*one)["channel"],
-			"modifyuser":       (*one)["modifyuser"],
-			"priority":         (*one)["priority"],
-			"stype":            "回收",
-			"comeintime":       time.Now().Unix(),
-			"claimtime":        (*one)["claimtime"],
-			"recovertime":      (*one)["recovertime"],
-			"returntime":       int64(0),
-			"important":        (*one)["spiderimportant"],
-			"returnreason":     "转平台",
-			"claimrecovertype": 0,
-			"source":           "爬虫转平台回收",
+		//已被认领的爬虫回收
+		if qu.IntAll((*one)["claimtype"]) > CLAIMTYPEUNCLAIMED {
+			//换平台爬虫回收,保存日志
+			recovelog := map[string]interface{}{
+				"site":             (*one)["site"],
+				"code":             (*one)["code"],
+				"channel":          (*one)["channel"],
+				"modifyuser":       (*one)["modifyuser"],
+				"priority":         (*one)["priority"],
+				"stype":            "回收",
+				"comeintime":       time.Now().Unix(),
+				"claimtime":        (*one)["claimtime"],
+				"recovertime":      (*one)["recovertime"],
+				"expiretime":       (*one)["expiretime"],
+				"returntime":       int64(0),
+				"important":        (*one)["spiderimportant"],
+				"returnreason":     "转平台",
+				"claimrecovertype": 0,
+				"source":           "爬虫转平台回收",
+			}
+			u.MgoEB.Save("lua_logs_claim", recovelog)
 		}
-		u.MgoEB.Save("lua_logs_claim", recovelog)
 	} else if w == "priority" { //调整优先级
 		priority := qu.IntAll(val)
 		if priority < 0 {

+ 3 - 3
src/front/luamove.go

@@ -163,12 +163,12 @@ func SpiderMoveLua(codes []string, events []string) bool {
 			if state == 5 { //查询该爬虫是否是已上架状态,是则更新节点后上架,否则只更新
 				upresult := true
 				var err interface{}
-				upresult, err = spider.UpdateSpiderByCodeState(code, "6", event) //脚本下架
-				if upresult && err == nil {                                      //下架成功,更新节点
+				upresult, err = spider.UpdateSpiderByCodeState(code, "6", event, false) //脚本下架
+				if upresult && err == nil {                                             //下架成功,更新节点
 					re := qu.IntAll(resultEvent)
 					u.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": re, "incrementevent": re}}, false, false)
 					//上架
-					upresult, err = spider.UpdateSpiderByCodeState(code, "5", re) //脚本上架
+					upresult, err = spider.UpdateSpiderByCodeState(code, "5", re, false) //脚本上架
 					if !upresult || err != nil {
 						qu.Debug("爬虫节点转移", code, "上架失败")
 						msg = append(msg, "爬虫节点转移"+code+"上架失败")

+ 6 - 6
src/front/spider.go

@@ -1404,7 +1404,7 @@ func AssortOrDisablesCode(codes []string, state int, stype, reason string) (msg
 		if (*lua)["downevent"] != nil { //爬虫开发修改爬虫节点,审核人员分类爬虫时,原来爬虫所在节点下架
 			event = qu.IntAll((*lua)["downevent"])
 		}
-		upresult, err := spider.UpdateSpiderByCodeState(code, "6", event)
+		upresult, err := spider.UpdateSpiderByCodeState(code, "6", event, false)
 		qu.Debug("更新爬虫状态:", stype, code, upresult, err)
 		if upresult && err == nil {
 			//更新爬虫
@@ -1594,21 +1594,21 @@ func UpStateAndUpSpider(code, id, reason, username string, state int) (bool, err
 			// } else {
 			// 	upresult = true
 			// }
-			upresult, err = spider.UpdateSpiderByCodeState(code, "6", event) //下架
+			upresult, err = spider.UpdateSpiderByCodeState(code, "6", event, false) //下架
 			qu.Debug("下架:", upresult, code)
 		case u.Sp_state_5: //上架(爬虫端在更新上架的时候为了更新内存中字段,采用先下架上架)
 			if downevent := qu.IntAll((*one)["downevent"]); downevent != 0 { //爬虫开发修改爬虫节点,审核人员上架爬虫时,原来爬虫所在节点下架
-				upresult, err = spider.UpdateSpiderByCodeState(code, "6", downevent)
+				upresult, err = spider.UpdateSpiderByCodeState(code, "6", downevent, false)
 				qu.Debug(code, "下架历史节点:", downevent)
 				if upresult && err == nil {
 					unset = map[string]interface{}{"downevent": ""}
 				}
 			} else {
-				upresult, err = spider.UpdateSpiderByCodeState(code, "6", event)
+				upresult, err = spider.UpdateSpiderByCodeState(code, "6", event, false)
 			}
 			qu.Debug("下架:", upresult, code, event)
 			if upresult && err == nil {
-				upresult, err = spider.UpdateSpiderByCodeState(code, fmt.Sprint(state), event)
+				upresult, err = spider.UpdateSpiderByCodeState(code, fmt.Sprint(state), event, true)
 				qu.Debug("上架:", upresult, code, event)
 			}
 		case u.Sp_state_3: //审核通过
@@ -1843,7 +1843,7 @@ func (f *Front) ChangeEvent() {
 		oldevent := qu.IntAll((*info)["event"])
 		if qu.IntAll((*info)["state"]) == u.Sp_state_5 {
 			//源节点下架
-			_, err := spider.UpdateSpiderByCodeState(code, fmt.Sprint(u.Sp_state_6), oldevent)
+			_, err := spider.UpdateSpiderByCodeState(code, fmt.Sprint(u.Sp_state_6), oldevent, false)
 			set := map[string]interface{}{
 				"$set": map[string]interface{}{
 					"event": qu.IntAll(event),

+ 1 - 1
src/luaerrdata/errdata.go

@@ -449,7 +449,7 @@ func (ed *ErrorData) UpdateOnlineLua() {
 		return
 	}
 	event := qu.IntAll((*lua)["event"])
-	b, err := spider.UpdateSpiderByCodeState(code, "-1", event)
+	b, err := spider.UpdateSpiderByCodeState(code, "-1", event, false)
 	if b && err == nil {
 		//mgdb.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"restate": 4, "updatetime": time.Now().Unix()}}, false, false)
 		u.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"restate": 4, "updatetime": time.Now().Unix()}}, false, false)

+ 1 - 1
src/luaerrdata/errlua.go

@@ -57,7 +57,7 @@ func (el *ErrorLua) ReUpSpider() {
 		event := qu.IntAll((*lua)["event"])
 		if qu.IntAll((*lua)["state"]) == 5 { //是上架状态
 			//重新上架
-			b, err := spider.UpdateSpiderByCodeState(code, "5", event)
+			b, err := spider.UpdateSpiderByCodeState(code, "5", event, false)
 			if b && err == nil {
 				success = true
 				go u.MgoS.Update("spider_loadfail",

+ 10 - 8
src/spider/service.go

@@ -400,16 +400,16 @@ func CreateFile(code, script string) (string, error) {
 }
 
 // 上传脚本
-func UpdateSpiderByCodeState(code, state string, event int) (b bool, err error) {
+func UpdateSpiderByCodeState(code, state string, event int, delay bool) (b bool, err error) {
 	istate := qu.IntAll(state)
 	defer func() {
-		if b && err == nil && istate == 6 { //下架更新
-			UpdateOnlineCode(code, istate)
+		if b && err == nil {
+			UpdateOnlineCode(code, istate, delay)
 		}
 	}()
-	if istate == 5 {
-		go UpdateOnlineCode(code, istate) //上架更新
-	}
+	//if istate == 5 {
+	//	go UpdateOnlineCode(code, istate) //上架更新
+	//}
 	msgid := mu.UUID(8)
 	data := map[string]interface{}{}
 	data["code"] = code
@@ -433,7 +433,7 @@ func UpdateSpiderByCodeState(code, state string, event int) (b bool, err error)
 	}
 }
 
-func UpdateOnlineCode(code string, state int) bool {
+func UpdateOnlineCode(code string, state int, delay bool) bool {
 	qu.Debug("-----------更新线上爬虫", code)
 	upsert := false
 	query := map[string]interface{}{"code": code}
@@ -441,7 +441,9 @@ func UpdateOnlineCode(code string, state int) bool {
 	if state == 6 { //下架
 		set = map[string]interface{}{"state": state}
 	} else if state == 5 { //上架
-		time.Sleep(30 * time.Second) //更新太多了,防止这个luaconfig_online更新动作在luaconfig之前
+		if delay {
+			time.Sleep(30 * time.Second) //更新太多了,防止这个luaconfig_online更新动作在luaconfig之前
+		}
 		lua, _ := u.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
 		if len(*lua) > 0 {
 			(*lua)["state"] = state

+ 3 - 2
src/timetask/timetask.go

@@ -69,6 +69,7 @@ func RecoverCodes() {
 				"comeintime":       now,
 				"claimtime":        l["claimtime"],
 				"recovertime":      l["recovertime"],
+				"expiretime":       l["expiretime"],
 				"returntime":       int64(0),
 				"important":        l["spiderimportant"],
 				"returnreason":     "",
@@ -271,14 +272,14 @@ func SpiderMoveEvent() {
 					"$set": set,
 				}
 				if downevent := qu.IntAll((*lua)["downevent"]); downevent != 0 {
-					spider.UpdateSpiderByCodeState(code, "6", downevent)
+					spider.UpdateSpiderByCodeState(code, "6", downevent, false)
 					qu.Debug(code, "下架历史节点:", downevent)
 					upset["$unset"] = map[string]interface{}{"downevent": ""}
 				}
 				upOk := util.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, upset, false, false)
 				qu.Debug(code, "脚本更新成功:", upOk)
 				if upOk {
-					upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架
+					upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent, false) //脚本上架
 					qu.Debug(code, "脚本上架", upresult, err)
 				}
 			}