Browse Source

高性能模式列表页并发采集

maxiaoshan 1 năm trước cách đây
mục cha
commit
8d62040d07
3 tập tin đã thay đổi với 200 bổ sung14 xóa
  1. 5 4
      src/config.json
  2. 3 4
      src/main.go
  3. 192 6
      src/spider/spider.go

+ 5 - 4
src/config.json

@@ -15,13 +15,13 @@
     "msgserveraddrfile": "spdata.jianyu360.com:802",
     "msgserveraddrchromedp": "spdata.jianyu360.com:807",
 	"isdelay":false,
-    "working": 1,
+    "working": 0,
     "chansize": 4,
     "detailchansize": 20,
     "uploadevent": 7100,
     "logLevel": 1,
     "daynum": 6,
-    "modal": 0,
+    "modal": 1,
     "ishistoryevent": false,
     "threadbasenum": 50,
     "threadupperlimit": 10,
@@ -44,11 +44,12 @@
     "pageturninfo": {
         "repeatpagetimeslimit_w0": 3,
         "repeatpagetimeslimit_w1": 3,
-        "turnpagemaxlimit_w0": 4,
+        "turnpagemaxlimit_w0": 5,
         "turnpagemaxlimit_w1": 50,
         "nextpagemaxlimit_w0": 5,
         "nextpagemaxlimit_w1": 50,
-        "listparalleltasklimit": 3
+        "listparalleltasklimit": 3,
+        "listthreadsnum": 2
     },
     "fileServer": "http://test.qmx.top:9333",
     "jsvmurl": "http://127.0.0.1:8080/jsvm",

+ 3 - 4
src/main.go

@@ -4,7 +4,6 @@ import (
 	codegrpc "analysiscode/client"
 	_ "filter"
 	"fmt"
-	gojs "gorunjs/client"
 	"io/ioutil"
 	"os"
 	"spider"
@@ -35,7 +34,7 @@ func init() {
 	//验证码识别client
 	codegrpc.InitCodeGrpcClient()
 	//go执行js服务
-	gojs.InitGoRunJsClient()
+	//gojs.InitGoRunJsClient()
 	InitRedisClient(Config.Redisservers)           //初始化Redis
 	InitBloomRedisClient(Config.BloomRedisservers) //初始化Bloom Redis
 	//初始化es
@@ -126,9 +125,9 @@ func main() {
 	//查列表页信息采集三级页
 	go spider.DetailData()
 	//处理心跳信息
-	go spider.SaveHeartInfo()
+	//go spider.SaveHeartInfo()
 	//批量保存心跳信息
-	go spider.UpdateHeartInfo()
+	//go spider.UpdateHeartInfo()
 	//7000历史节点下载详情页
 	go spider.HistoryEventDownloadDetail()
 

+ 192 - 6
src/spider/spider.go

@@ -12,6 +12,7 @@ import (
 	mgo "mongodb"
 	qu "qfw/util"
 	es "qfw/util/elastic.v7"
+	"sort"
 	"strconv"
 	"strings"
 	"sync"
@@ -149,11 +150,16 @@ func (s *Spider) ExecJob(reload bool) {
 	//}
 	//判断是否使用高并发下载三级页
 	var err interface{}
-	if util.Config.Working == 0 && util.Config.Modal == 1 && !util.Config.IsHistoryEvent {
-		err = s.DownListPageItemByThreads() //下载列表
+	if util.Config.PageTurnInfo.ListThreadsNum > 1 {
+		err = s.DownListPageItemByThreads() //并发下载列表
 	} else {
 		err = s.DownListPageItem() //下载列表
 	}
+	//if util.Config.Working == 0 && util.Config.Modal == 1 && !util.Config.IsHistoryEvent {
+	//	err = s.DownListPageItemByThreads() //下载列表
+	//} else {
+	//	err = s.DownListPageItem() //下载列表
+	//}
 	if err != nil {
 		logger.Error(s.Code, err)
 	}
@@ -342,7 +348,7 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
 				//		StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
 				//	}
 				//}
-			} else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页
+			} else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页(存在列表页数据被全部过滤的情况)
 				if downtimes < 2 {
 					downtimes++
 					start--
@@ -455,6 +461,187 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
 
 //并发下载列表
 func (s *Spider) DownListPageItemByThreads() (errs interface{}) {
+	defer qu.Catch()
+	start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
+	s.MaxPage = max                                                            //记录爬虫配置的最大页
+	repeatAllNum := int64(0)                                                   //本轮采集总的重复个数
+	downloadAllNum := int64(0)                                                 //本轮采集总个数
+	if util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history" {     //7000节点根据爬虫类型,取采集的最大页
+		max = s.GetIntVar("spiderHistoryMaxPage") //采集历史的爬虫,取历史最大页配置spiderHistoryMaxPage
+	}
+	repeatPageTimesLimit := 0 //记录页码连续判重的次数上限
+	isRunRepeatList := false  //是否执行列表页连续判重逻辑
+	//是否进行连续翻页判断,修改最大页
+	if !util.Config.IsHistoryEvent && util.Config.Modal == 1 && max > 1 && max < 101 { //除顺序采集模式和非历史节点外所有节点,采集列表页时进行连续10页判重
+		isRunRepeatList = true
+		repeatPageTimesLimit = util.Config.PageTurnInfo.RepeatPageTimesLimitW0
+		max = util.Config.PageTurnInfo.TurnPageMaxLimitW0 //高性能模式设置最大页为100
+		if util.Config.Working == 1 {                     //队列模式
+			repeatPageTimesLimit = util.Config.PageTurnInfo.RepeatPageTimesLimitW1 //连续判重页3
+			max = util.Config.PageTurnInfo.TurnPageMaxLimitW1                      //队列模式最大页50
+		}
+	}
+	//子任务判断
+	//if s.ContinueDownListChildTask {
+	//	start = util.Config.PageTurnInfo.TurnPageMaxLimitW0 + 1
+	//	max = util.Config.PageTurnInfo.TurnPageMaxLimitW0 + util.Config.PageTurnInfo.NextPageMaxLimitW0
+	//	if util.Config.Working == 1 { //队列模式
+	//		start = util.Config.PageTurnInfo.TurnPageMaxLimitW1 + 1
+	//		max = util.Config.PageTurnInfo.TurnPageMaxLimitW1 + util.Config.PageTurnInfo.NextPageMaxLimitW1
+	//	}
+	//}
+	//创建并发Spider对象
+	spChan := make(chan *Spider, 1)
+	if isRunRepeatList && util.Config.PageTurnInfo.ListThreadsNum > 1 { //无限翻页模式设置spChan并发大小
+		spChan = make(chan *Spider, util.Config.PageTurnInfo.ListThreadsNum)
+		spChan <- s                                                                                //加入通道
+		NewSpiderByScript(util.Config.PageTurnInfo.ListThreadsNum-1, s.Code, s.ScriptFile, spChan) //创建多个Spider对象
+	} else {
+		spChan <- s //加入通道
+	}
+	endPage := 0     //结束页
+	repeatTimes := 0 //连续判重次数
+	for ; start <= max && !s.Stop; start += util.Config.PageTurnInfo.ListThreadsNum {
+		if !s.Stop && !s.ContinueDownListChildTask { //在下载详情页时爬虫下架,此时不再存心跳信息
+			UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "list", start == 1) //记录所有节点列表页心跳
+		}
+		listWg := &sync.WaitGroup{}
+		pageMap := map[int]bool{} //记录某页数据是否都已采集
+		pageArr := []int{}
+		lock := &sync.Mutex{}
+		//并发下载列表页
+		for listThreadNum := 0; listThreadNum < util.Config.PageTurnInfo.ListThreadsNum; listThreadNum++ {
+			pagenum := start + listThreadNum //当前实际采集页码
+			if pagenum > max {               //并发采集时,每次开启repeatPageTimesLimit个并发,并发有可能超过max上限
+				break
+			}
+			spTmp := <-spChan //通道中取出sp对象
+			listWg.Add(1)
+			atomic.AddInt64(&ListAllThreadNum, 1)
+			endPage = pagenum + 1
+			go func(sp *Spider, pagenum int) {
+				defer func() {
+					spChan <- sp //处理完数据sp对象放回通道中
+					listWg.Done()
+					atomic.AddInt64(&ListAllThreadNum, -1)
+				}()
+				//下载某一页数据
+				downnum, repeatnum := sp.DownListOnePage(pagenum)
+				//汇总下载量
+				atomic.AddInt64(&downloadAllNum, int64(downnum))
+				atomic.AddInt64(&repeatAllNum, int64(repeatnum))
+				lock.Lock()
+				pageMap[pagenum] = downnum == repeatnum //当前pagenum页数据是否已采集
+				pageArr = append(pageArr, pagenum)
+				lock.Unlock()
+				if downnum > 0 {
+					//使用并发采集列表页时,spider对象不是同一个,只能采集后统计
+					if pagenum == 1 { //将第一页sp采集信息的hash值赋值给s
+						s.PageOneTextHash = sp.PageOneTextHash
+					} else if pagenum == 2 { //将第二页sp采集信息的hash值赋值给s
+						s.PageTwoTextHash = sp.PageTwoTextHash
+					}
+				}
+			}(spTmp, pagenum)
+		}
+		listWg.Wait()
+		if isRunRepeatList {
+			sort.Ints(pageArr) //页码从小到大排序
+			for _, page := range pageArr {
+				if pageMap[page] { //
+					repeatTimes++
+				} else {
+					repeatTimes = 0
+				}
+			}
+			if repeatTimes >= repeatPageTimesLimit { //超过连续判重页,不再采集
+				break
+			}
+		}
+	}
+	//close(spChan) //关闭通道,释放资源
+	logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, endPage, s.Stop)
+	if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率
+		nowTime := time.Now()
+		sDate := qu.FormatDate(&nowTime, qu.Date_Short_Layout)
+		set := map[string]interface{}{
+			"site":       s.Name,
+			"channel":    s.Channel,
+			"spidercode": s.Code,
+			"updatetime": nowTime.Unix(),
+			"event":      util.Config.Uploadevent,
+			"modifyuser": s.MUserName,
+			"maxpage":    s.MaxPage,
+			"runrate":    s.SpiderRunRate,
+			"endpage":    endPage,
+			"date":       sDate,
+		}
+		inc := map[string]interface{}{
+			"alltimes": 1,
+		}
+		//记录翻页是否成功
+		if s.MaxPage > 1 { //最大页为1的,用列表页是否异常体现爬虫运行情况
+			if s.PageOneTextHash != "" {
+				if s.PageTwoTextHash != "" {
+					if s.PageOneTextHash != s.PageTwoTextHash {
+						inc["page_success"] = 1
+					} else {
+						inc["page_fail"] = 1
+					}
+				} else {
+					inc["page_fail"] = 1
+				}
+			} else if s.PageTwoTextHash != "" {
+				inc["page_onefail"] = 1
+			}
+		}
+		if downloadAllNum > 0 {
+			rate := float64(downloadAllNum-repeatAllNum) / float64(downloadAllNum)
+			rate, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", rate), 64)
+			if rate == 1.0 {
+				if downloadAllNum == 1 { //列表页数据过滤的只剩一条新数据
+					inc["oh_percent_onenum"] = 1
+				} else {
+					inc["oh_percent"] = 1
+				}
+				//} else if rate >= 0.9 {
+				//	inc["nt_percent"] = 1
+				//} else if rate >= 0.8 {
+				//	inc["et_percent"] = 1
+				//} else {
+				//	inc["other_percent"] = 1
+			}
+			if isRunRepeatList && endPage > max { //连续翻页超过了上限
+				if !s.ContinueDownListChildTask {
+					go ContinueDownListPageItem(s) //开启子任务继续采集
+				} else {
+					inc["uplimit"] = 1
+				}
+			}
+		} else {
+			inc["zero"] = 1
+		}
+		query := map[string]interface{}{
+			"date":       sDate,
+			"spidercode": s.Code,
+		}
+		coll := "spider_downloadrate"
+		if s.ContinueDownListChildTask {
+			coll = "spider_downloadrate_child"
+		}
+		MgoS.Update(coll, query, map[string]interface{}{
+			"$set": set,
+			"$inc": inc,
+		}, true, false)
+	}
+	//信息重置
+	s.PageOneTextHash = ""
+	s.PageTwoTextHash = ""
+	return errs
+}
+
+// 并发下载列表
+func (s *Spider) DownListPageItemByThreadsBack() (errs interface{}) {
 	defer qu.Catch()
 	start, max := s.GetIntVar("spiderStartPage"), s.GetIntVar("spiderMaxPage") //起始页、最大页
 	s.MaxPage = max                                                            //记录爬虫配置的最大页
@@ -665,7 +852,7 @@ func (s *Spider) DownListOnePage(pagenum int) (downnum, repeatnum int) {
 				//		StoreBlak(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, list)
 				//	}
 				//}
-			} else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页
+			} else { //避免因网络问题当前下载的列表页无数据,重新请求下载列表页(存在列表页数据被全部过滤的情况)
 				downtimes++
 				continue
 			}
@@ -1033,11 +1220,10 @@ func GetListDataDownloadDetail() {
 func (s *Spider) DownloadHighDetail(reload bool) {
 	defer qu.Catch()
 	for {
-		logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
+		logger.Info("Detail Running Code:", s.Code, "	Stop:", s.Stop)
 		if !s.Stop { //爬虫是运行状态
 			s.DownloadDetail(reload, false)
 		} else {
-			logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
 			break
 		}
 	}