Pārlūkot izejas kodu

新增心跳和类竞品爬虫es采集逻辑

maxiaoshan 3 gadi atpakaļ
vecāks
revīzija
fc7c0bfd29
5 mainītis faili ar 156 papildinājumiem un 37 dzēšanām
  1. 72 9
      src/config.json
  2. 10 3
      src/main.go
  3. 7 4
      src/spider/script.go
  4. 61 20
      src/spider/spider.go
  5. 6 1
      src/spider/store.go

+ 72 - 9
src/config.json

@@ -9,7 +9,7 @@
     "msgserveraddr": "spdata.jianyu360.com:801",
     "msgserveraddrfile": "spdata.jianyu360.com:802",
 	"isdelay":false,
-    "working": 0,
+    "working": 1,
     "chansize": 4,
     "detailchansize": 20,
     "uploadevent": 7100,
@@ -19,22 +19,85 @@
     "ishistoryevent": false,
     "tesseractadd": "http://test.qmx.top:1688",
     "testdir": "res/test/spider_test.lua",
-    "redisservers": "title_repeat_judgement=192.168.3.207:1679,title_repeat_fulljudgement=192.168.3.207:1679,title_repeat_listpagehref=192.168.3.207:2679",
+    "redisservers": "title_repeat_judgement=192.168.3.207:1679,title_repeat_fulljudgement=192.168.3.207:1679,title_repeat_listpagehref=192.168.3.207:1679",
      "word":{
     	"keyword":"(抽签|中标|招标|成交|合同|中标候选人|资格预审|拟建|邀请|询价|比选|议价|竞价|磋商|采购|招投标|答疑|变更公告|更正公告|竞争性谈判|竞谈|意见征询|澄清|单一来源|流标|废标|验收公告|中止|终止|违规|处罚|征集公告|开标结果|评审结果|监理|招租|租赁|评判结果|项目|遴选|补遗|竞标|征求意见)",
     	"notkeyword":"(招聘|拍卖|出租|出让|使用权|资产)"
     },
-    "delaysite": {
-        "中国政府采购网": 7
-    },
     "oss":{
-    	"ossEndpoint":"oss-cn-beijing-internal.aliyuncs.com",
-		"ossAccessKeyId":"LTAI4G5x9aoZx8dDamQ7vfZi",  
-		"ossAccessKeySecret":"Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
-		"ossBucketName":"jy-datafile"
+        "ossEndpoint":"oss-cn-beijing-internal.aliyuncs.com",
+        "ossAccessKeyId":"LTAI4G5x9aoZx8dDamQ7vfZi",
+        "ossAccessKeySecret":"Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "ossBucketName":"jy-datafile"
     },
     "fileServer": "http://test.qmx.top:9333",
     "jsvmurl": "http://127.0.0.1:8080/jsvm",
+    "es": {
+        "addr": "http://192.168.3.206:9800",
+        "pool": 15,
+        "index": "bidding",
+        "type": "bidding"
+    },
+    "delaysite": {
+        "晨砻采购网": 12,
+        "光纤在线": 12,
+        "轨道招标网": 12,
+        "国际石油网": 12,
+        "国际燃气网": 12,
+        "中国工程机械商贸网": 12,
+        "国际风力发电网": 12,
+        "泰茂医疗器械信息": 12,
+        "北极星输配电网": 12,
+        "智慧交通网": 12,
+        "招采进宝官网": 12,
+        "阳光易招公共资源交易平台": 12,
+        "阳光招采电子招标投标交易平台": 12,
+        "智慧电子采购网": 12,
+        "易交在线电子招标投标交易平台": 12,
+        "通信人才网": 12,
+        "必联电子招标投标平台": 12,
+        "网优招投标网": 12,
+        "中国国际招标网": 12,
+        "易企询": 12,
+        "e交易": 12,
+        "招标通电子招投标交易平台": 12,
+        "国信创新招标投标交易平台": 12,
+        "诚E招电子采购交易平台": 12,
+        "阳光慧采在线交易平台": 12,
+        "招采云电子招投标交易平台": 12,
+        "赢标电子招标采购交易平台": 12,
+        "汕头建筑信息网": 12,
+        "招必得": 12,
+        "中国医疗器械信息网": 12,
+        "易联器械网": 12,
+        "行采家": 12,
+        "国际电力网": 12,
+        "福易采电子交易平台": 12,
+        "湖南招标网": 12,
+        "远瞩采购云平台": 12,
+        "招采进宝通用专区": 12,
+        "云竞网": 12,
+        "瞬速网上竞价平台": 12,
+        "医信通": 12,
+        "锐竞科研采购平台": 12,
+        "医药网": 12,
+        "医疗器械市场调研平台": 12,
+        "采购与招标网(元博网)": 12,
+        "解放号软件产业互联网平台": 12,
+        "交易365招标采购平台": 12,
+        "中国大宗物资网": 12,
+        "云买卖电子综合交易平台": 12,
+        "化工云商网": 12,
+        "一联易招交易服务平台": 12,
+        "中国公共采购网": 12,
+        "大宗交易网": 12,
+        "招采猫电子交易平台": 12,
+        "易能智招电子交易平台": 12,
+        "爱建云招标采购电子交易平台": 12,
+        "物业行业电子招标采购平台": 12,
+        "阳光如水招投标电子交易平台": 12,
+        "黑龙江易采招标投标平台": 12
+    },
     "luadisablelib": {
         "baselib": {
             "print": false

+ 10 - 3
src/main.go

@@ -9,14 +9,13 @@ import (
 
 	mgo "mongodb"
 	qu "qfw/util"
+	es "qfw/util/elastic"
 	mgu "qfw/util/mongodbutil"
 	"regexp"
-
-	//"qfw/util/redis"
 	"runtime"
 	. "spiderutil"
 	"time"
-
+	//"qfw/util/redis"
 	"github.com/donnie4w/go-logger/logger"
 	"github.com/go-xweb/xweb"
 	"github.com/yuin/gopher-lua"
@@ -45,6 +44,14 @@ func init() {
 	//	} else { //redis集群
 	//		InitRedisCluster(Config.Redishosts, 20, 100, false)
 	//	}
+	//初始化es
+	spider.EsIndex = qu.ObjToString(Config.Es["index"])
+	spider.EsType = qu.ObjToString(Config.Es["type"])
+	spider.Es = &es.Elastic{
+		S_esurl: qu.ObjToString(Config.Es["addr"]),
+		I_size:  qu.IntAll(Config.Es["pool"]),
+	}
+	spider.Es.InitElasticSize()
 	//启动消息服务
 	spider.InitMsgClient(Config.Msgserveraddr, Config.Msgname)
 	spider.InitMsgClientFile(Config.MsgserveraddrFile, Config.Msgname+"file")

+ 7 - 4
src/spider/script.go

@@ -233,6 +233,9 @@ func (s *Script) LoadScript(code, script_file string, newstate bool) string {
 		content := S.ToString(-1)
 		ret := s.L.NewTable()
 		util.FindListHtml(gpath, content, ret)
+		if ret.Len() > 0 {
+			UpdateHeart("", "", code, "", "findlist") //记录列表页实际采集数据量心跳
+		}
 		S.Push(ret)
 		return 1
 	}))
@@ -808,10 +811,10 @@ func getChildrenLen(sq *gq.Selection) (ret int) {
 }
 
 //
-func (s *Script) Reload() {
-	s.L.Close()
-	s.LoadScript(s.SCode, s.ScriptFile, false)
-}
+//func (s *Script) Reload() {
+//	s.L.Close()
+//	s.LoadScript(s.SCode, s.ScriptFile, false)
+//}
 
 //unicode转码
 func transUnic(str string) string {

+ 61 - 20
src/spider/spider.go

@@ -19,19 +19,21 @@ import (
 
 	//mgu "qfw/util/mongodbutil"
 	//"qfw/util/redis"
+
+	"github.com/donnie4w/go-logger/logger"
+	"github.com/yuin/gopher-lua"
+	es "qfw/util/elastic"
 	"regexp"
 	util "spiderutil"
 	"strings"
 	"sync/atomic"
 	"time"
-
-	"github.com/donnie4w/go-logger/logger"
-	"github.com/yuin/gopher-lua"
 )
 
 type Heart struct {
 	DetailHeart        int64  //爬虫三级页执行心跳
 	DetailExecuteHeart int64  //三级页采集到数据心跳
+	FindListHeart      int64  //findListHtml执行心跳
 	ListHeart          int64  //爬虫列表页执行心跳
 	ModifyUser         string //爬虫维护人
 	Site               string //站点
@@ -75,6 +77,9 @@ type Spider struct {
 	IsCompete        bool //区分新老爬虫
 }
 
+var Es *es.Elastic
+var EsIndex string
+var EsType string
 var UpdataMgoCache = make(chan []map[string]interface{}, 1000)   //更新要重下数据的状态
 var UpdataHeartCache = make(chan []map[string]interface{}, 1000) //更新爬虫心跳信息
 var SP = make(chan bool, 5)
@@ -89,6 +94,8 @@ func UpdateHeart(site, channel, code, user, t string) {
 		if heart, ok := htmp.(*Heart); ok {
 			if t == "list" {
 				heart.ListHeart = time.Now().Unix()
+			} else if t == "findlist" {
+				heart.FindListHeart = time.Now().Unix()
 			} else if t == "detail" {
 				heart.DetailHeart = time.Now().Unix()
 			} else if t == "detailexcute" {
@@ -103,6 +110,8 @@ func UpdateHeart(site, channel, code, user, t string) {
 		}
 		if t == "list" {
 			heart.ListHeart = time.Now().Unix()
+		} else if t == "findlist" {
+			heart.FindListHeart = time.Now().Unix()
 		} else if t == "detail" {
 			heart.DetailHeart = time.Now().Unix()
 		} else if t == "detailexcute" {
@@ -423,7 +432,7 @@ func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 	db := HexToBigIntMod(href)
 	hashHref := HexText(href)
 	id := ""
-	SaveListPageData(paramdata, &id)                                          //存储采集记录
+	SaveListPageData(paramdata, &id, false)                                   //存储采集记录
 	isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) //取全量redis
 	//log.Println("full href:", href, " isExist:", isExist)
 	logger.Debug("full href:", href, " isExist:", isExist)
@@ -497,7 +506,6 @@ func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 	defer mu.Catch()
 	var err interface{}
-	//TODO 下载3级页,调用LUA分析;如果配置的不用下载3级页,就到此为止了,直接存储
 	data := map[string]interface{}{}
 	paramdata := p.(map[string]interface{})
 	for k, v := range paramdata {
@@ -538,7 +546,21 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 			*num++ //已采集
 			return
 		}
-		SaveListPageData(paramdata, &id) //保存7000、7410、7500、7700节点列表页采集的信息
+		isEsRepeat := false
+		if delayDay := util.Config.DelaySites[s.Name]; delayDay > 0 { //类竞品站点爬虫title做es7天内判重检验(顺序采集无法延迟,只能判重)
+			title := qu.ObjToString(paramdata["title"])
+			eTime := fmt.Sprint(GetTime(0))
+			sTime := fmt.Sprint(GetTime(-7))
+			esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + sTime + `","lte": "` + eTime + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
+			if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
+				isEsRepeat = true
+			}
+		}
+		SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7700节点列表页采集的信息
+		if isEsRepeat {                              //类竞品数据title判重数据加入redis
+			util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
+			return
+		}
 	}
 	//下载、解析、入库
 	data, err = s.DownloadDetailPage(paramdata, data)
@@ -721,9 +743,14 @@ func (s *Spider) DownloadHighDetail() {
 	for {
 		logger.Info("Running Code:", s.Code, "Stop:", s.Stop)
 		if !s.Stop { //爬虫是运行状态
-			comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)}                 //采集一周内的数据,防止有数据一直采不下来,造成积累
-			if delayDay := util.Config.DelaySites[s.Name]; delayDay > 0 && delayDay <= util.Config.DayNum { //判断该爬虫是否属于要延迟采集的站点
-				comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
+			comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
+			isEsRepeat := false                                                             //是否进行es判重
+			if delayDay := util.Config.DelaySites[s.Name]; delayDay > 0 {
+				isEsRepeat = true
+				if delayDay <= util.Config.DayNum*24 { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay小时采集(由于7410、7500、7700为顺序采集,无法延时)
+					//comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
+					comeintimeQuery["$lte"] = time.Now().Unix() - int64(3600*delayDay)
+				}
 			}
 			q := map[string]interface{}{
 				"spidercode": s.Code,
@@ -744,13 +771,17 @@ func (s *Spider) DownloadHighDetail() {
 				for _, tmp := range *list {
 					_id := tmp["_id"]
 					query := map[string]interface{}{"_id": _id}
-					competehref := qu.ObjToString(tmp["competehref"])
-					if competehref != "" { //验证三方网站数据剑鱼是否已采集
+					if isEsRepeat { //es数据title判重
 						title := qu.ObjToString(tmp["title"])
-						one, _ := Mgo.FindOne("data_bak", map[string]interface{}{"title": title})
-						if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
+						href := qu.ObjToString(tmp["href"])
+						eTime := fmt.Sprint(GetTime(0))
+						sTime := fmt.Sprint(GetTime(-7))
+						esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + sTime + `","lte": "` + eTime + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
+						count := Es.Count(EsIndex, EsType, esQuery)
+						if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
 							set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
 							Mgo.Update("spider_highlistdata", query, set, false, false)
+							util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
 							continue
 						}
 					}
@@ -849,12 +880,19 @@ func (s *Spider) DownloadListDetail() {
 		s.L.Close()
 		CC2 <- s.L
 	}()
+	comeintimeQuery := map[string]interface{}{"$gte": GetTime(-util.Config.DayNum)} //采集一周内的数据,防止有数据一直采不下来,造成积累
+	isEsRepeat := false                                                             //是否进行es判重
+	if delayDay := util.Config.DelaySites[s.Name]; delayDay > 0 {
+		isEsRepeat = true
+		if delayDay <= util.Config.DayNum { //判断该爬虫是否属于要延迟采集的站点,数据延迟delayDay天采集(由于7410、7500、7700为顺序采集,无法延时)
+			//comeintimeQuery["$lte"] = GetTime(-delayDay + 1)
+			comeintimeQuery["$lte"] = time.Now().Unix() - int64(86400*delayDay)
+		}
+	}
 	q := map[string]interface{}{
 		"spidercode": s.Code,
 		"state":      0, //0:入库状态;-1:采集失败;1:成功
-		"comeintime": map[string]interface{}{ //采集一周内的数据,防止有数据一直采不下来,造成积累
-			"$gte": GetTime(-util.Config.DayNum),
-		},
+		"comeintime": comeintimeQuery,
 	}
 	o := map[string]interface{}{"_id": -1}
 	f := map[string]interface{}{
@@ -870,13 +908,16 @@ func (s *Spider) DownloadListDetail() {
 		for _, tmp := range *list {
 			_id := tmp["_id"]
 			query := map[string]interface{}{"_id": _id}
-			competehref := qu.ObjToString(tmp["competehref"])
-			if competehref != "" { //验证三方网站数据剑鱼是否已采集
+			if isEsRepeat { //es数据title判重
 				title := qu.ObjToString(tmp["title"])
-				one, _ := Mgo.FindOne("data_bak", map[string]interface{}{"title": title})
-				if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
+				href := qu.ObjToString(tmp["href"])
+				eTime := fmt.Sprint(GetTime(0))
+				sTime := fmt.Sprint(GetTime(-7))
+				esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + sTime + `","lte": "` + eTime + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
+				if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
 					set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true}} //已存在state置为1
 					Mgo.Update("spider_highlistdata", query, set, false, false)
+					util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
 					continue
 				}
 			}

+ 6 - 1
src/spider/store.go

@@ -256,10 +256,15 @@ func SaveHighListPageData(tmp map[string]interface{}, href string, num *int) {
 }
 
 //保存7000、7500、7700采集的列表页信息
-func SaveListPageData(tmp map[string]interface{}, id *string) {
+func SaveListPageData(tmp map[string]interface{}, id *string, isEsRepeat bool) {
 	tmp["event"] = lu.Config.Uploadevent
 	tmp["comeintime"] = time.Now().Unix()
 	tmp["state"] = 0
+	tmp["exist"] = false
+	if isEsRepeat { //类竞品数据es判重掉后,更新状态
+		tmp["state"] = 1
+		tmp["exist"] = true
+	}
 	*id = Mgo.Save("spider_listdata", tmp)
 }