소스 검색

新增心跳、修改爬虫补漏逻辑、高性能采三级页重载爬虫

maxiaoshan 3 년 전
부모
커밋
548bdb8149
4개의 변경된 파일130개의 추가작업 그리고 9개의 파일을 삭제
  1. 5 1
      src/main.go
  2. 28 1
      src/spider/handler.go
  3. 57 4
      src/spider/spider.go
  4. 40 3
      src/spider/store.go

+ 5 - 1
src/main.go

@@ -105,6 +105,8 @@ func main() {
 	go spider.ReloadSpiderFile()
 	//爬虫信息提交编辑器
 	go spider.SpiderInfoSend()
+	//处理心跳信息
+	go spider.SaveHeartInfo()
 	//内存信息
 	go heapprint()
 	//查列表页信息采集三级页
@@ -112,7 +114,9 @@ func main() {
 	//定时任务(现在此任务由编辑器建任务时完成)
 	//go spider.TimeTask()
 	//批量保存错误数据
-	go spider.UpdataErrDataMgo()
+	go spider.UpdateErrDataMgo()
+	//批量保存心跳信息
+	go spider.UpdateHeartInfo()
 	logger.Debug(Config.Webport)
 	xweb.Run(":" + Config.Webport)
 }

+ 28 - 1
src/spider/handler.go

@@ -23,6 +23,8 @@ import (
 	. "gopkg.in/mgo.v2/bson"
 )
 
+var SpiderHeart sync.Map = sync.Map{} //爬虫心跳
+
 var Allspiders sync.Map = sync.Map{}
 var Allspiders2 sync.Map = sync.Map{}
 var LoopListPath sync.Map = sync.Map{}
@@ -177,9 +179,10 @@ func QueueUpScriptList() {
 						})
 					}
 					if isHistoricalMend { //下载历史的爬虫执行一次后删除
-						logger.Debug("Delete History Code:", code)
 						DelLen++
 						LoopListPath.Delete(key)
+						b := mgu.Update("luaconfig", "editor", "editor", map[string]interface{}{"code": code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
+						logger.Debug("Delete History Code:", code, b)
 					}
 				}
 				listLen++
@@ -1159,6 +1162,30 @@ func SpiderInfoSend() {
 	util.TimeAfterFunc(5*time.Minute, SpiderInfoSend, TimeChan)
 }
 
+//保存心跳信息
+func SaveHeartInfo() {
+	time.Sleep(30 * time.Second)
+	SpiderHeart.Range(func(key, value interface{}) bool {
+		code := key.(string)
+		heart, ok := value.(*Heart)
+		if ok {
+			update := []map[string]interface{}{}
+			update = append(update, map[string]interface{}{"code": code})
+			update = append(update, map[string]interface{}{"$set": map[string]interface{}{
+				"list":          heart.ListHeart,
+				"detail":        heart.DetailHeart,
+				"detailexecute": heart.DetailExecuteHeart,
+				"modifyuser":    heart.ModifyUser,
+				"event":         util.Config.Uploadevent,
+				"updatetime":    time.Now().Unix(),
+			}})
+			UpdataHeartCache <- update
+		}
+		return true
+	})
+	time.AfterFunc(20*time.Minute, SaveHeartInfo)
+}
+
 //信息提交编辑器
 func SpiderCodeSendToEditor(code string) {
 	defer qu.Catch()

+ 57 - 4
src/spider/spider.go

@@ -14,6 +14,7 @@ import (
 	mu "mfw/util"
 	mgo "mongodb"
 	qu "qfw/util"
+	mgu "qfw/util/mongodbutil"
 	"sync"
 
 	//"sync"
@@ -30,6 +31,13 @@ import (
 	"github.com/yuin/gopher-lua"
 )
 
+type Heart struct {
+	DetailHeart        int64  //爬虫三级页执行心跳
+	DetailExecuteHeart int64  //三级页采集到数据心跳
+	ListHeart          int64  //爬虫列表页执行心跳
+	ModifyUser         string //爬虫维护人
+}
+
 //爬虫()
 type Spider struct {
 	Script
@@ -65,12 +73,46 @@ type Spider struct {
 	IsMustDownload   bool //是否强制下载
 }
 
-var UpdataMgoCache = make(chan []map[string]interface{}, 1000) //更新要重下数据的状态
+var UpdataMgoCache = make(chan []map[string]interface{}, 1000)   //更新要重下数据的状态
+var UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
 var SP = make(chan bool, 5)
+var SPH = make(chan bool, 5)
 var Mgo *mgo.MongodbSim
 var TimeChan = make(chan bool, 1)
 var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
 
+//心跳
+func UpdateHeart(code, user, t string) {
+	if htmp, ok := SpiderHeart.Load(code); ok {
+		if heart, ok := htmp.(*Heart); ok {
+			if t == "list" {
+				heart.ListHeart = time.Now().Unix()
+			} else if t == "detail" {
+				heart.DetailHeart = time.Now().Unix()
+			} else if t == "detailexcute" {
+				heart.DetailExecuteHeart = time.Now().Unix()
+			}
+		}
+	} else {
+		if t == "list" {
+			SpiderHeart.Store(code, &Heart{
+				ListHeart:  time.Now().Unix(),
+				ModifyUser: user,
+			})
+		} else if t == "detail" {
+			SpiderHeart.Store(code, &Heart{
+				DetailHeart: time.Now().Unix(),
+				ModifyUser:  user,
+			})
+		} else if t == "detailexcute" {
+			SpiderHeart.Store(code, &Heart{
+				DetailExecuteHeart: time.Now().Unix(),
+				ModifyUser:         user,
+			})
+		}
+	}
+}
+
 //任务
 func (s *Spider) StartJob() {
 	s.Stop = false
@@ -117,7 +159,8 @@ func (s *Spider) ExecJob(reload bool) {
 	if err != nil {
 		logger.Error(s.Code, err)
 	}
-	err = s.DownListPageItem() //下载列表
+	UpdateHeart(s.Code, s.MUserName, "list") //记录所有节点列表页心跳
+	err = s.DownListPageItem()               //下载列表
 	if err != nil {
 		logger.Error(s.Code, err)
 	}
@@ -144,10 +187,12 @@ func (s *Spider) ExecJob(reload bool) {
 					return
 				}
 			*/
-			if s.IsMustDownload { //历史数据下载,只跑一轮
-				fmt.Println("Delete History Code:", s.Code)
+			//if s.IsMustDownload { //历史数据下载,只跑一轮
+			if s.IsHistoricalMend && util.Config.IsHistoryEvent { //历史节点7000,高性能模式,历史补漏只下载一轮
 				s.Stop = true
 				s.L.Close()
+				b := mgu.Update("luaconfig", "editor", "editor", map[string]interface{}{"code": s.Code}, map[string]interface{}{"$set": map[string]interface{}{"state": 6}}, false, false)
+				logger.Info("Delete History Code:", s.Code, b)
 			} else {
 				if !s.Stop { //未下架定时执行
 					util.TimeAfterFunc(time.Duration(s.SpiderRunRate)*time.Minute, func() {
@@ -420,6 +465,7 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 	} else {
 		SaveListPageData(paramdata) //保存7000、7500、7700节点列表页采集的信息
 	}
+	UpdateHeart(s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
 	//下载、解析、入库
 	data, err = s.DownloadDetailPage(paramdata, data)
 	if err != nil || data == nil {
@@ -445,6 +491,7 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 	if t1 > time.Now().Unix() { //防止发布时间超前
 		data["publishtime"] = time.Now().Unix()
 	}
+	UpdateHeart(s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳
 	delete(data, "exit")
 	delete(data, "checkpublishtime")
 	data["comeintime"] = time.Now().Unix()
@@ -599,6 +646,7 @@ func (s *Spider) DownloadHighDetail() {
 				"comeintime": 0,
 				"event":      0,
 			}
+			UpdateHeart(s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
 			list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
 			if list != nil && len(*list) > 0 {
 				for _, tmp := range *list {
@@ -626,6 +674,7 @@ func (s *Spider) DownloadHighDetail() {
 					}
 					//下载、解析、入库
 					data, err = s.DownloadDetailPage(tmp, data)
+					UpdateHeart(s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
 					if err != nil || data == nil {
 						success = false
 						times++
@@ -680,6 +729,8 @@ func (s *Spider) DownloadHighDetail() {
 					set := map[string]interface{}{"$set": map[string]interface{}{"state": 1}} //下载成功state置为1
 					Mgo.Update("spider_highlistdata", query, set, false, false)
 				}
+				//重载spider
+				s.LoadScript(s.Code, s.ScriptFile, true)
 			} else { //没有数据
 				time.Sleep(2 * time.Minute)
 			}
@@ -715,6 +766,7 @@ func (s *Spider) DownloadListDetail() {
 		"comeintime": 0,
 		"event":      0,
 	}
+	UpdateHeart(s.Code, s.MUserName, "detail") //记录modal=1采集三级页心跳
 	list, _ := Mgo.Find("spider_highlistdata", q, o, f, false, 0, 100)
 	if list != nil && len(*list) > 0 {
 		for _, tmp := range *list {
@@ -742,6 +794,7 @@ func (s *Spider) DownloadListDetail() {
 			}
 			//下载、解析、入库
 			data, err = s.DownloadDetailPage(tmp, data)
+			UpdateHeart(s.Code, s.MUserName, "detailexcute") //记录modal=1下载数据心跳
 			if err != nil || data == nil {
 				success = false
 				times++

+ 40 - 3
src/spider/store.go

@@ -436,8 +436,8 @@ func UpdateHighListDataByCode(code string) {
 }
 
 //批量更新错误数据
-func UpdataErrDataMgo() {
-	fmt.Println("Save...")
+func UpdateErrDataMgo() {
+	fmt.Println("Update Error Data...")
 	arru := make([][]map[string]interface{}, 50)
 	indexu := 0
 	for {
@@ -465,9 +465,46 @@ func UpdataErrDataMgo() {
 					}()
 					Mgo.UpSertBulk("regatherdata", arru...)
 				}(arru[:indexu])
-				arru = make([][]map[string]interface{}, 200)
+				arru = make([][]map[string]interface{}, 50)
 				indexu = 0
 			}
 		}
 	}
 }
+
+//批量更新心跳信息
+func UpdateHeartInfo() {
+	fmt.Println("Update Heart Info...")
+	heartarr := make([][]map[string]interface{}, 200)
+	indexh := 0
+	for {
+		select {
+		case v := <-UpdataHeartCache:
+			heartarr[indexh] = v
+			indexh++
+			if indexh == 200 {
+				SPH <- true
+				go func(heartarr [][]map[string]interface{}) {
+					defer func() {
+						<-SPH
+					}()
+					Mgo.UpSertBulk("spider_heart", heartarr...)
+				}(heartarr)
+				heartarr = make([][]map[string]interface{}, 200)
+				indexh = 0
+			}
+		case <-time.After(10 * time.Second):
+			if indexh > 0 {
+				SPH <- true
+				go func(heartarr [][]map[string]interface{}) {
+					defer func() {
+						<-SPH
+					}()
+					Mgo.UpSertBulk("spider_heart", heartarr...)
+				}(heartarr[:indexh])
+				heartarr = make([][]map[string]interface{}, 200)
+				indexh = 0
+			}
+		}
+	}
+}