Quellcode durchsuchen

延迟采集站点定时重载更新

mxs vor 3 Wochen
Ursprung
Commit
35f58ba1bc
3 geänderte Dateien mit 31 neuen und 13 gelöschten Zeilen
  1. 0 1
      src/config.json
  2. 14 4
      src/spider/spider.go
  3. 17 8
      src/spider/util.go

+ 0 - 1
src/config.json

@@ -26,7 +26,6 @@
     "threadbasenum": 50,
     "threadupperlimit": 10,
     "serveraddress": "127.0.0.1:8030",
-    "jsserveraddress":  "127.0.0.1:8031",
     "tesseractadd": "http://test.qmx.top:1688",
     "testdir": "res/test/spider_test.lua",
     "redisservers": "list=192.168.3.166:1579",

+ 14 - 4
src/spider/spider.go

@@ -100,7 +100,8 @@ var (
 	RestrictAccessReg = regexp.MustCompile(`访问被拒绝`)
 	AllThreadNum      int64
 	ListAllThreadNum  int64
-	DelaySiteMap      map[string]*DelaySite                       //延迟采集站点集合
+	DelaySiteMap      map[string]*DelaySite //延迟采集站点集合
+	DelaySiteLock     *sync.Mutex
 	UpdataHeartCache  = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
 	SPH               = make(chan bool, 5)
 
@@ -1196,7 +1197,10 @@ func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 	//2、非7000(历史节点)的历史补漏,采完列表直接采详情,采完爬虫下架(当前无此爬虫)
 	id := ""
 	isEsRepeat := false
-	if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
+	DelaySiteLock.Lock()
+	delaySite := DelaySiteMap[s.Name]
+	DelaySiteLock.Unlock()
+	if delaySite != nil && delaySite.Compete {
 		title := qu.ObjToString(paramdata["title"])
 		eTime := time.Now().Unix()
 		sTime := eTime - int64(7*86400)
@@ -1316,7 +1320,10 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 			UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail", false) //记录modal=0老模式采集三级页心跳
 		}
 		isEsRepeat := false
-		if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
+		DelaySiteLock.Lock()
+		delaySite := DelaySiteMap[s.Name]
+		DelaySiteLock.Unlock()
+		if delaySite != nil && delaySite.Compete {
 			title := qu.ObjToString(paramdata["title"])
 			eTime := time.Now().Unix()
 			sTime := eTime - int64(7*86400)
@@ -1547,7 +1554,10 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 	o := map[string]interface{}{"_id": -1}
 	if !isHistory { //非历史数据下载,补充comeintime时间检索条件
 		comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
-		if delaySite := DelaySiteMap[s.Name]; delaySite != nil {
+		DelaySiteLock.Lock()
+		delaySite := DelaySiteMap[s.Name]
+		DelaySiteLock.Unlock()
+		if delaySite != nil {
 			isEsRepeat = delaySite.Compete
 			if delaySite.DelayTime <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时)
 				//comeintimeQuery["$lte"] = GetTime(-delayDay + 1)

+ 17 - 8
src/spider/util.go

@@ -1,8 +1,10 @@
 package spider
 
 import (
+	"github.com/donnie4w/go-logger/logger"
 	util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
 	qu "qfw/util"
+	"sync"
 	"time"
 )
 
@@ -15,15 +17,22 @@ var ErrFid = "a6879f0a8570256aa21fb978e6dabb50429a30dfacff697cf0b898abbc5c262e"
 func InitOther() {
 	defer qu.Catch()
 	DelaySiteMap = map[string]*DelaySite{}
-	list, _ := MgoS.Find("spider_compete", nil, nil, nil, false, -1, -1)
-	for _, l := range *list {
-		site := qu.ObjToString(l["site"])
-		delayTime := qu.IntAll(l["delaytime"])
-		compete, _ := l["compete"].(bool)
-		DelaySiteMap[site] = &DelaySite{
-			DelayTime: delayTime,
-			Compete:   compete,
+	DelaySiteLock = &sync.Mutex{}
+	for {
+		list, _ := MgoS.Find("spider_compete", nil, nil, nil, false, -1, -1)
+		for _, l := range *list {
+			site := qu.ObjToString(l["site"])
+			delayTime := qu.IntAll(l["delaytime"])
+			compete, _ := l["compete"].(bool)
+			DelaySiteLock.Lock()
+			DelaySiteMap[site] = &DelaySite{
+				DelayTime: delayTime,
+				Compete:   compete,
+			}
+			DelaySiteLock.Unlock()
 		}
+		logger.Info("重载延时采集配置...")
+		time.Sleep(12 * time.Hour)
 	}
 }