Переглянути джерело

新增列表、详情采集延迟

mxs 5 місяців тому
батько
коміт
028792aec2
4 змінених файлів з 65 додано та 18 видалено
  1. 4 0
      src/config.json
  2. 17 6
      src/spider/download.go
  3. 4 8
      src/spider/script.go
  4. 40 4
      src/spider/spider.go

+ 4 - 0
src/config.json

@@ -41,6 +41,10 @@
         "ossAccessKeySecret":"Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
         "ossBucketName":"jy-editor"
     },
+    "delaytime": {
+        "listdelaytime": 1000,
+        "detaildelaytime": 1000
+    },
     "pageturninfo": {
         "repeatpagetimeslimit": 3,
         "turnpagemaxlimit": 100,

+ 17 - 6
src/spider/download.go

@@ -12,6 +12,7 @@ import (
 	"math/rand"
 	mu "mfw/util"
 	"net/http"
+	qu "qfw/util"
 	"regexp"
 	lu "spiderutil"
 	"time"
@@ -20,12 +21,8 @@ import (
 	"github.com/surfer/agent"
 )
 
-var regImgStr = "\\.(JPG|jpg|jpeg|JPEG|GIF|gif|PNG|png|BMP|bmp|doc|docx|pdf|xls|xlsx)$"
-var regImg *regexp.Regexp
-
-func init() {
-	regImg, _ = regexp.Compile(regImgStr)
-}
+var base64FileType = regexp.MustCompile(`.*?data:(image|document)/(png|jpg|jpeg|doc|docx|pdf|xlsx|xls);base64,`)
+var regImg = regexp.MustCompile(`(?i)\.(pdf|doc|docx|xls|xlsx|ppt|pptx|jpg|jpeg|png|gif|bmp|zip|rar|7z|gz|csv|swf)$`)
 
 // 下载页面,发送消息,等待下载
 func Download(retLen *int64, downloaderid, url, method string, head map[string]interface{}, encoding string, useproxy, ishttps bool, code string, timeout int64) string {
@@ -406,3 +403,17 @@ func isAvailableFile(code string) bool {
 	}
 	return b
 }
+
+// DownloadIsBase64 下载是否是base64附件上传
+func DownloadIsBase64(url string) (bool, string, string) {
+	defer qu.Catch()
+	/*
+		附件base64类型
+		1、图片类 data:image/png;base64;data:image/jpg;base64;data:image/jpeg;base64
+		2、文档类 data:document/doc;base64;data:document/docx;base64;data:document/pdf;base64;data:document/xlsx;base64;data:document/xls;base64;
+	*/
+	if result := base64FileType.FindStringSubmatch(url); len(result) == 3 {
+		return true, result[2], base64FileType.ReplaceAllString(url, "")
+	}
+	return false, "", ""
+}

+ 4 - 8
src/spider/script.go

@@ -318,8 +318,7 @@ func (s *Script) LoadScript(site, channel, user *string, code, script_file strin
 		fileName := S.ToString(-6)
 		ishttps := strings.Contains(url, "https")
 		//base64匹配
-		base64UrlReg := regexp.MustCompile("data:image")
-		indexArr := base64UrlReg.FindStringIndex(url)
+		isBase64, fileType, base64Url := DownloadIsBase64(url)
 		name, size, ftype, fid := "", "", "", ""
 		tmpUrl := ""
 		var ret []byte
@@ -332,13 +331,10 @@ func (s *Script) LoadScript(site, channel, user *string, code, script_file strin
 		}
 
 		//base64 url
-		if len(indexArr) == 2 { //base64 http://www.mmjyjt.com/data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAqAAAAOwCAYAAAD
+		if isBase64 { //base64 http://www.mmjyjt.com/data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAqAAAAOwCAYAAAD
 			//截取base64
-			start := indexArr[0]
-			url = url[start:]
-			fileName = "文件下载.jpg"
-			index := strings.Index(url, ",")
-			dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(url[index+1:]))
+			fileName = "文件下载." + fileType
+			dec := base64.NewDecoder(base64.StdEncoding, strings.NewReader(base64Url))
 			ret, err = io.ReadAll(dec)
 			if err == nil && len(ret) > 0 {
 				url, name, size, ftype, fid = util.UploadFile(s.SCode, fileName, "", ret)

+ 40 - 4
src/spider/spider.go

@@ -317,7 +317,12 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
 			repeatPageTimes++ //次数加1
 		}
 		downtimes = 0 //当前页下载无误,重置下载重试次数
-		util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
+		//每个列表请求延迟
+		if listDelayTime := util.Config.DelayTime.ListDelayTime; listDelayTime > 0 {
+			time.Sleep(time.Duration(listDelayTime) * time.Millisecond)
+		} else {
+			util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
+		}
 	}
 	logger.Info(s.Code, "本轮列表页采集详情:", s.ContinueDownListChildTask, downloadAllNum, repeatAllNum, start, s.Stop)
 	if !util.Config.IsHistoryEvent && !s.Stop { //非历史节点统计下载率
@@ -1066,7 +1071,12 @@ func (s *Spider) SupplementDownListPageItem() (errs interface{}) {
 		downtimes = 0
 		errtimes = 0
 		errPageNum = 0
-		util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
+		//每个列表请求延迟
+		if listDelayTime := util.Config.DelayTime.ListDelayTime; listDelayTime > 0 {
+			time.Sleep(time.Duration(listDelayTime) * time.Millisecond)
+		} else {
+			util.TimeSleepFunc(100*time.Millisecond, TimeSleepChan)
+		}
 	}
 	logger.Info(s.Code, "本轮列表页采集详情:", downloadAllNum, repeatAllNum, saveAllNum, finish)
 	//补充采集信息
@@ -1085,6 +1095,9 @@ func (s *Spider) DownListOnePage(pagenum int) (downnum, repeatnum int) {
 	defer qu.Catch()
 	downtimes := 0
 	for downtimes < 3 { //错误重试3次
+		if listDelayTime := util.Config.DelayTime.ListDelayTime; listDelayTime > 0 {
+			time.Sleep(time.Duration(listDelayTime) * time.Millisecond) //每个列表请求延迟
+		}
 		if err := s.L.CallByParam(lua.P{
 			Fn:      s.L.GetGlobal("downloadAndParseListPage"),
 			NRet:    1,
@@ -1421,6 +1434,10 @@ func (s *Spider) DownloadDetailByNames(p interface{}) {
 // 下载解析详情页
 func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
 	defer qu.Catch()
+	//每个详情页请求延迟
+	if detailDelayTime := util.Config.DelayTime.DetailDelayTime; detailDelayTime > 0 {
+		time.Sleep(time.Duration(detailDelayTime) * time.Millisecond)
+	}
 	s.LastHeartbeat = time.Now().Unix()
 	util.TimeSleepFunc((time.Duration(s.SleepBase+util.GetRandMath(s.SleepRand)))*time.Millisecond, TimeSleepChan)
 	tab := s.L.NewTable()
@@ -1542,7 +1559,7 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 		coll = "spider_historydata"
 		o["_id"] = 1 //历史数据正序
 	}
-	f := map[string]interface{}{
+	f := map[string]interface{}{ //不需要的字段
 		"state":      0,
 		"comeintime": 0,
 		"event":      0,
@@ -1555,6 +1572,17 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 		s.Stop = true
 		return
 	}
+	//工作时间判断
+	isWorkTime := true //是否是工作时间(发布量大)
+	if countNum == 0 { //增量未采集数据为0,在非工作时间下载未下载成功的数据
+		if nowHour := time.Now().Hour(); nowHour < 6 && nowHour >= 0 { //非工作时间
+			isWorkTime = false
+			q["state"] = map[string]interface{}{ //未下载成功数据
+				"$ne": 1,
+			}
+			countNum = MgoS.Count(coll, q) //统计util.Config.DayNum天内未下载爬虫个数
+		}
+	}
 	//logger.Info("Thread Info:	Code:", s.SCode, "	count:", countNum)
 	if countNum > 0 {
 		threadNum := countNum / util.Config.ThreadBaseNum //线程数
@@ -1562,6 +1590,9 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 			threadNum = util.Config.ThreadUpperLimit
 		}
 		logger.Info("Thread Info:	Code:", s.SCode, "	count:", countNum, "	thread num:", threadNum)
+		if !isWorkTime { //非工作时间,按照下载失败次数顺序加载数据
+			o = map[string]interface{}{"times": 1}
+		}
 		list, _ := MgoS.Find(coll, q, o, f, false, 0, 200)
 		if list != nil && len(*list) > 0 {
 			spChan := make(chan *Spider, threadNum+1) //初始化线程通道(+1表示基本的线程数)
@@ -1600,6 +1631,11 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 					if s.Stop || sp == nil { //爬虫下架或者初始化sp为nil时不再下载数据
 						return
 					}
+					if !isWorkTime {
+						if nowHour := time.Now().Hour(); nowHour >= 6 && nowHour < 24 { //防止失败数据list数量过大,导致非工作时间没下完的情况
+							return
+						}
+					}
 					_id := tmp["_id"]
 					query := map[string]interface{}{"_id": _id}
 					href := qu.ObjToString(tmp["href"])
@@ -1637,7 +1673,7 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 					if !isHistory && !sp.Stop && sp.IsMainThread { //在下载详情页时爬虫下架,此时不再存心跳信息
 						UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute", false) //记录modal=1下载数据心跳
 					}
-					if err != nil || data == nil {
+					if err != nil || len(data) == 0 {
 						success = false
 						times++
 						if err != nil {