Przeglądaj źródła

新增7000历史转增量节点定时任务及告警邮件调整

maxiaoshan 2 lat temu
rodzic
commit
21e917d490
3 zmienionych plików z 164 dodań i 50 usunięć
  1. 2 4
      src/front/luamove.go
  2. 4 9
      src/main.go
  3. 158 37
      src/timetask/timetask.go

+ 2 - 4
src/front/luamove.go

@@ -1,7 +1,6 @@
 package front
 
 import (
-	"encoding/json"
 	"fmt"
 	"io/ioutil"
 	"net/http"
@@ -200,8 +199,7 @@ func SendMail(text string) {
 	}
 }
 
-//
-func SpiderMoveEvent(data string) {
+/*func SpiderMoveEvent(data string) {
 	//解析爬虫代码
 	data = util.Se.DecodeString(data)
 	infos := []interface{}{}
@@ -299,4 +297,4 @@ func GetEvent(code string, lua map[string]interface{}) int {
 		}
 	}
 	return result
-}
+}*/

+ 4 - 9
src/main.go

@@ -103,15 +103,10 @@ func main() {
 		if types == "mtask" {
 			front.SpiderModifyTask(data)
 		}
-		if types == "code" {
-			front.SpiderMoveEvent(data)
-		}
+		//if types == "code" { //节点转移(由于6011端口总挂,调整为定时任务)
+		//	front.SpiderMoveEvent(data)
+		//}
 	})
-	go func() {
-		err := http.ListenAndServe(":6011", nil)
-		if err != nil {
-			qu.Debug("6011 Error:", err)
-		}
-	}()
+	go http.ListenAndServe(":6011", nil)
 	xweb.Run(":" + util.Config.Webport)
 }

+ 158 - 37
src/timetask/timetask.go

@@ -5,8 +5,10 @@ import (
 	"io/ioutil"
 	"net/http"
 	qu "qfw/util"
+	"spider"
 	sp "spiderutil"
 	"strings"
+	"sync"
 	"time"
 	"util"
 
@@ -20,43 +22,10 @@ func TimeTask() {
 	c := cron.New()
 	c.Start()
 	c.AddFunc("0 20 9 ? * MON-FRI", CheckCreateTask)
-	c.AddFunc("0 0 */1 ? * *", CheckLuaMove)
-	c.AddFunc("0 30 23 * * *", UpdateSiteInfo) //定时更新站点信息
-	c.AddFunc("0 0 23 * * *", UpdateCodeHeart) //定时更新爬虫心跳信息
-}
-
-//监测爬虫由历史转增量时未成功的
-func CheckLuaMove() {
-	defer qu.Catch()
-	qu.Debug("开始检测爬虫节点移动...")
-	query := map[string]interface{}{
-		"comeintime": map[string]interface{}{
-			"$gte": time.Now().Add(-(time.Hour * 1)).Unix(),
-			"$lte": time.Now().Unix(),
-		},
-		"ok": false,
-	}
-	qu.Debug("query:", query)
-	list, _ := util.MgoEB.Find("luamovelog", query, nil, nil, false, -1, -1)
-	text := ""
-	if len(*list) > 0 {
-		for _, l := range *list {
-			stype := qu.ObjToString(l["type"])
-			code := qu.ObjToString(l["code"])
-			text += code + ":" + stype + ";"
-		}
-	}
-	if text != "" {
-		for i := 1; i <= 3; i++ {
-			res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-move-fail", text))
-			if err == nil {
-				res.Body.Close()
-				read, err := ioutil.ReadAll(res.Body)
-				qu.Debug("邮件发送:", string(read), err)
-				break
-			}
-		}
-	}
+	c.AddFunc("0 30 23 * * *", UpdateSiteInfo)   //定时更新站点信息
+	c.AddFunc("0 0 23 * * *", UpdateCodeHeart)   //定时更新爬虫心跳信息
+	c.AddFunc("0 0 */1 ? * *", CheckLuaMove)     //7000节点转增量爬虫失败告警
+	c.AddFunc("0 */10 * * * *", SpiderMoveEvent) //7000节点转增量爬虫
 }
 
 //检测创建任务失败的爬虫
@@ -148,3 +117,155 @@ func UpdateCodeHeart() {
 	}
 	qu.Debug("定时更新爬虫心跳信息完成...")
 }
+
+//监测爬虫由历史转增量时未成功的
+func CheckLuaMove() {
+	defer qu.Catch()
+	qu.Debug("开始检测爬虫节点移动...")
+	query := map[string]interface{}{
+		"comeintime": map[string]interface{}{
+			"$lte": time.Now().Add(-(time.Hour * 1)).Unix(),
+			//"$lte": time.Now().Unix(),
+		},
+		"ok": false,
+	}
+	qu.Debug("query:", query)
+	list, _ := util.MgoEB.Find("luamovelog", query, nil, nil, false, -1, -1)
+	if len(*list) > 0 {
+		codes := []string{}
+		for _, l := range *list {
+			code := qu.ObjToString(l["code"])
+			codes = append(codes, code)
+		}
+		for i := 1; i <= 3; i++ {
+			res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", sp.Config.JkMail["api"], sp.Config.JkMail["to"], "lua-move-fail", strings.Join(codes, ";")))
+			if err == nil {
+				res.Body.Close()
+				read, err := ioutil.ReadAll(res.Body)
+				qu.Debug("邮件发送:", string(read), err)
+				break
+			}
+		}
+	}
+}
+
+func SpiderMoveEvent() {
+	defer qu.Catch()
+	sess := util.MgoEB.GetMgoConn()
+	defer util.MgoEB.DestoryMongoConn(sess)
+	ch := make(chan bool, 2)
+	wg := &sync.WaitGroup{}
+	lock := &sync.Mutex{}
+	query := map[string]interface{}{
+		"ok": false,
+	}
+	count := util.MgoEB.Count("luamovelog", query)
+	if count == 0 {
+		return
+	}
+	it := sess.DB(util.MgoEB.DbName).C("luamovelog").Find(&query).Iter()
+	n := 0
+	arr := [][]map[string]interface{}{}
+	for tmp := make(map[string]interface{}); it.Next(tmp); n++ {
+		ch <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-ch
+				wg.Done()
+			}()
+			code := qu.ObjToString(tmp["code"])
+			//更新爬虫节点信息,上架
+			lua, _ := util.MgoEB.FindOne("luaconfig", map[string]interface{}{"code": code})
+			spidertype := qu.ObjToString((*lua)["spidertype"])
+			event := qu.IntAll((*lua)["event"])
+			var upresult bool
+			var err error
+			set := map[string]interface{}{}
+			qu.Debug("lua move:", code, event)
+			if spidertype == "history" {
+				newevent := GetEvent(code, (*lua))
+				qu.Debug("new event:", newevent)
+				set["event"] = newevent
+				set["spidertype"] = "increment"
+				//type_content, _ := (*lua)["type_content"].(int)
+				//iscopycontent, _ := (*lua)["iscopycontent"].(bool)
+				//str_content := qu.ObjToString((*lua)["str_content"])
+				//str_recontent := qu.ObjToString((*lua)["str_recontent"])
+				//if type_content == 1 && iscopycontent && str_recontent != "" { //三级页是专家模式且有复制三级页代码
+				//	set["iscopycontent"] = false
+				//	set["str_content"] = str_recontent
+				//	set["str_recontent"] = str_content
+				//}
+				if util.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": set}, false, false) {
+					upresult, err = spider.UpdateSpiderByCodeState(code, "5", newevent) //脚本上架
+				}
+			}
+			ok := false
+			if upresult && err == nil { //上架成功
+				ok = true
+				qu.Debug("Code:", code, "历史迁移到增量节点成功")
+			} else { //上架失败
+				util.MgoEB.Update("luaconfig", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"event": event, "state": 6}}, false, false)
+				qu.Debug("Code:", code, "历史迁移到增量节点失败")
+			}
+			if ok {
+				update := []map[string]interface{}{}
+				update = append(update, map[string]interface{}{"_id": tmp["_id"]})
+				update = append(update, map[string]interface{}{"$set": map[string]interface{}{"ok": ok, "updatetime": time.Now().Unix()}})
+				lock.Lock()
+				arr = append(arr, update)
+				lock.Unlock()
+			}
+		}(tmp)
+		tmp = map[string]interface{}{}
+	}
+	wg.Wait()
+	lock.Lock()
+	if len(arr) > 0 {
+		util.MgoEB.UpdateBulk("luamovelog", arr...)
+		arr = [][]map[string]interface{}{}
+	}
+	lock.Unlock()
+}
+
+func GetEvent(code string, lua map[string]interface{}) int {
+	defer qu.Catch()
+	//1、历史节点
+	if lua["historyevent"] != nil {
+		return qu.IntAll(lua["historyevent"])
+	}
+	//2、根据站点找节点
+	query := map[string]interface{}{
+		"code": map[string]interface{}{
+			"$ne": code,
+		},
+		"site":  lua["site"],
+		"state": 5,
+	}
+	tmp, _ := util.MgoEB.FindOne("luaconfig", query)
+	if tmp != nil && len(*tmp) > 0 {
+		return qu.IntAll((*tmp)["event"])
+	}
+	//3、7700
+	//spidermovevent := qu.ObjToString(lua["spidermovevent"])
+	//if spidermovevent == "7700" {
+	//	return 7700
+	//}
+	//4、根据数量分配节点
+	//num := 0
+	//result := 7700
+	//for k, t := range sp.Config.Uploadevents {
+	//	if qu.ObjToString(t) == spidermovevent { //bid、comm
+	//		event := qu.IntAll(k)
+	//		//count := mgdb.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
+	//		count := u.MgoEB.Count("luaconfig", map[string]interface{}{"state": 5, "event": event})
+	//		if num == 0 || count < num {
+	//			result = event
+	//			num = count
+	//		}
+	//	}
+	//}
+	//5、指定节点
+	return 7200
+}