maxiaoshan пре 3 година
родитељ
комит
9420557a7d
5 измењених фајлова са 44 додато и 178 уклоњено
  1. 6 3
      src/config.json
  2. 3 6
      src/main.go
  3. 10 25
      src/spider/msgservice.go
  4. 19 131
      src/spider/spider.go
  5. 6 13
      src/spider/store.go

+ 6 - 3
src/config.json

@@ -6,7 +6,7 @@
     "editor_dbsize": 2,
     "editoraddr": "http://127.0.0.1:6011/spider/infos",
     "msgname": "爬虫采集平台7100",
-    "msgserveraddr": "spdata.jianyu360.com:801",
+    "msgserveraddr": "spdata.jianyu360.com:803",
     "msgserveraddrfile": "spdata.jianyu360.com:802",
 	"isdelay":false,
     "working": 0,
@@ -22,8 +22,11 @@
     "serveraddress": "127.0.0.1:8030",
     "tesseractadd": "http://test.qmx.top:1688",
     "testdir": "res/test/spider_test.lua",
-    "redisservers": "title_repeat_judgement=192.168.3.207:2679,title_repeat_fulljudgement=192.168.3.207:2679,title_repeat_listpagehref=192.168.3.207:1679",
-     "word":{
+    "redisclusteraddrs": [
+        "192.168.3.207:2179",
+        "192.168.3.166:2379"
+    ],
+    "word":{
     	"keyword":"(抽签|中标|招标|成交|合同|中标候选人|资格预审|拟建|邀请|询价|比选|议价|竞价|磋商|采购|招投标|答疑|变更公告|更正公告|竞争性谈判|竞谈|意见征询|澄清|单一来源|流标|废标|验收公告|中止|终止|违规|处罚|征集公告|开标结果|评审结果|监理|招租|租赁|评判结果|项目|遴选|补遗|竞标|征求意见)",
     	"notkeyword":"(招聘|拍卖|出租|出让|使用权|资产)"
     },

+ 3 - 6
src/main.go

@@ -41,12 +41,9 @@ func init() {
 	spider.InitOther() //加载其他信息
 	//验证码识别client
 	//codegrpc.InitCodeGrpcClient()
-	InitRedis(Config.Redisservers) //初始化Redis
-	//	if Config.Redistype == "0" {
-	//		redis.InitRedis(Config.Redisservers)
-	//	} else { //redis集群
-	//		InitRedisCluster(Config.Redishosts, 20, 100, false)
-	//	}
+	//InitRedis(Config.Redisservers) //初始化Redis
+	//redis集群
+	InitRedisCluster(Config.RedisClusterAddrs)
 	//初始化es
 	spider.EsIndex = qu.ObjToString(Config.Es["index"])
 	spider.EsType = qu.ObjToString(Config.Es["type"])

+ 10 - 25
src/spider/msgservice.go

@@ -10,7 +10,6 @@ import (
 	qu "qfw/util"
 	mgu "qfw/util/mongodbutil"
 
-	//"qfw/util/redis"
 	util "spiderutil"
 	"time"
 
@@ -275,34 +274,20 @@ func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis
 		} else {
 			data["sendflag"] = "true"
 		}
-		//id := mgu.Save("data_bak", "spider", "spider", data)
-		//if !flag && id != "" {
 		href := fmt.Sprint(data["href"])
 		if len(href) > 5 && saveredis { //有效数据
-			db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
 			hashHref := HexText(href)
-			//增量
-			isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
-			id := ""
-			if !isExist {
-				id = mgu.Save("data_bak", "spider", "spider", data)
-			} else { //记录重复数据,spider_repeatdata
-				data["from"] = "spider"
-				mgu.Save("spider_repeatdata", "spider", "spider", data)
-			}
-			//保存服务未接收成功的数据会存入data_bak中,确保数据不丢失依赖补发程序
-			if id != "" {
-				util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-				if !flag { //保存服务发送成功
-					//全量(判断是否已存在防止覆盖id)
-					isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
-					if !isExist {
-						util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
-					}
-				}
-			}
+			data["redisexists"] = util.RedisClusterExists(hashHref)
+			mgu.Save("data_bak", "spider", "spider", data)
+			//id := mgu.Save("data_bak", "spider", "spider", data)
+			////保存服务未接收成功的数据会存入data_bak中,确保数据不丢失依赖补发程序
+			//if id != "" && !flag { //保存服务发送成功
+			//	hashHref := HexText(href)
+			//	if !util.RedisClusterExists(hashHref) { //保存服务过滤掉的异常数据目前不在其程序内存储href到全量的redis,此处补充(是否在保存服务端保存所有数据href)
+			//		util.RedisClusterSet(hashHref, "", -1)
+			//	}
+			//}
 		}
-		//}
 	}
 }
 

+ 19 - 131
src/spider/spider.go

@@ -18,9 +18,6 @@ import (
 	"strconv"
 	"sync"
 
-	//mgu "qfw/util/mongodbutil"
-	//"qfw/util/redis"
-
 	es "qfw/util/elastic"
 	"regexp"
 	util "spiderutil"
@@ -342,15 +339,8 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
 							atomic.AddInt32(&s.TotalDowncount, 1)
 							href := fmt.Sprint(tmp["href"])
 							if len(href) > 5 { //有效数据
-								db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
 								hashHref := HexText(href)
-								//增量(redis默认db0)
-								util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-								//全量(判断是否已存在防止覆盖id)
-								isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
-								if !isExist {
-									util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
-								}
+								util.RedisClusterSet(hashHref, "", -1) //全量redis
 								list = append(list, tmp)
 							}
 						}
@@ -491,24 +481,20 @@ func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 	if len(href) <= 5 { //无效数据
 		return
 	}
-	db := HexToBigIntMod(href)
 	hashHref := HexText(href)
 	id := ""
-	SaveListPageData(paramdata, &id, false)                                   //存储采集记录
-	isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref) //取全量redis
-	//log.Println("full href:", href, " isExist:", isExist)
+	SaveListPageData(paramdata, &id, false)      //存储采集记录
+	isExist := util.RedisClusterExists(hashHref) //取全量redis
 	logger.Debug("full href:", href, " isExist:", isExist)
 	if !s.IsMustDownload && isExist { //非强制下载redis中存在,结束
-		//qu.Debug("非强制下载redis中存在,结束")
 		//更新spider_listdata中数据下载成功标记
 		if id != "" {
-			//Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true)
 			Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}})
 		}
 		return
 	}
 	//qu.Debug("----------------下载、解析、入库--------------------")
-	//下载、解析、入库
+	//下载详情页
 	data, err = s.DownloadDetailPage(paramdata, data)
 	if err != nil || data == nil { //下载失败,结束
 		if err != nil {
@@ -526,12 +512,7 @@ func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 	//详情页过滤数据
 	set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}
 	if data["delete"] != nil {
-		//增量
-		util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-		//全量
-		db := HexToBigIntMod(href)
-		hashHref := HexText(href)
-		util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
+		util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
 		//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
 		set["delete"] = true
 		Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set})
@@ -539,7 +520,6 @@ func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 	}
 	//更新spider_listdata中数据下载成功标记
 	if id != "" {
-		//Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "byid": id}}, false, true)
 		Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set})
 	}
 	flag := true
@@ -547,18 +527,16 @@ func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 	if s.IsMustDownload {                                           //强制下载
 		if isExist && t1 < time.Now().AddDate(0, 0, -5).Unix() {
 			//qu.Debug("强制下载 redis存在")
-			data["dataging"] = 1
+			data["dataging"] = 1 //此处dataging=1对应保存服务中取redis中href对应的id值,进行更新(现redis中已无id值,所以无效)
 			flag = false
 		} else {
 			//qu.Debug("强制下载 redis不存在")
 			data["dataging"] = 0
-			//WithinThreeDays(&data) //根据发布时间打标记
 		}
 	} else { //非强制下载
 		if !isExist {
 			//qu.Debug("非强制下载 redis不存在")
 			data["dataging"] = 0
-			//WithinThreeDays(&data) //根据发布时间打标记
 		}
 	}
 	if t1 > time.Now().Unix() { //防止发布时间超前
@@ -592,33 +570,17 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 		*num++ //视为已采集
 		return
 	}
-	/*
-		//查询增量redis查看信息是否已经下载
-		isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
-		if isExist { //更新redis生命周期
-			util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*30)
-			*num++ //已采集
-			return
-		}
-		log.Println("href had++:", isExist, href)
-	*/
+	hashHref := HexText(href)
 	id := ""                    //记录spider_listdata中保存的数据id,便于下载成功后更新状态
 	if util.Config.Modal == 1 { //除7000、7500、7700节点外所有节点只采集列表页信息
-		isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
-		if isExist { //更新redis生命周期
-			util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-			*num++ //已采集
-			return
-		}
-		SaveHighListPageData(paramdata, s.SCode, href, num)
+		SaveHighListPageData(paramdata, s.SCode, hashHref, num)
 		return
 	} else {
 		if !s.Stop {
 			UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
 		}
-		isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
-		if isExist { //更新redis生命周期
-			util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
+		isExist := util.RedisClusterExists(hashHref) //全量信息中已采集
+		if isExist {
 			*num++ //已采集
 			return
 		}
@@ -634,7 +596,7 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 		}
 		SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7700节点列表页采集的信息
 		if isEsRepeat {                              //类竞品数据title判重数据加入redis
-			util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
+			util.RedisClusterSet(hashHref, "", -1) //全量存值
 			return
 		}
 	}
@@ -653,16 +615,7 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 		}
 		return
 	} else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
-		log.Println("beforeHref:", href, "afterHref:", tmphref)
-		//增量
-		util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-		//全量
-		db := HexToBigIntMod(href)
-		hashHref := HexText(href)
-		isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
-		if !isExist {
-			util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
-		}
+		util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href
 	}
 	//详情页下载数据成功心跳
 	if !s.Stop {
@@ -671,12 +624,7 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 	set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix(), "byid": id}
 	//详情页过滤数据
 	if data["delete"] != nil {
-		//增量
-		util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-		//全量
-		db := HexToBigIntMod(href)
-		hashHref := HexText(href)
-		util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
+		util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
 		//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
 		set["delete"] = true
 		Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
@@ -685,7 +633,6 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 	//更新spider_listdata中数据下载成功标记
 	if id != "" {
 		Mgo.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
-		//Mgo.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1}})
 	}
 
 	t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
@@ -700,21 +647,8 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 	atomic.AddInt32(&s.TodayDowncount, 1)
 	atomic.AddInt32(&s.TotalDowncount, 1)
 	data["spidercode"] = s.Code
-	//qu.Debug("-----增量开始保存-----")
-
-	// 临时保存数据
-	// update := []map[string]interface{}{}
-	// _id := data["_id"].(string)
-	// update = append(update, map[string]interface{}{"_id": qu.StringTOBsonId(_id)})
-	// update = append(update, map[string]interface{}{
-	// 	"$set": map[string]interface{}{
-	// 		"jsondata": data["jsondata"],
-	// 	},
-	// })
-	// UpdataMgoCache <- update
 	data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
 	Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
-	//qu.Debug("-----增量保存结束-----")
 }
 
 //遍历下载名录
@@ -924,10 +858,11 @@ func (s *Spider) DownloadDetail(stype string) {
 					_id := tmp["_id"]
 					query := map[string]interface{}{"_id": _id}
 					href := qu.ObjToString(tmp["href"])
+					hashHref := HexText(href)
 					update := []map[string]interface{}{}
 					//由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
-					//为了避免重复下载,进行量redis判重
-					isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
+					//为了避免重复下载,进行量redis判重
+					isExist := util.RedisClusterExists(hashHref)
 					if isExist {
 						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
 						update = append(update, query)
@@ -944,7 +879,7 @@ func (s *Spider) DownloadDetail(stype string) {
 						esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
 						count := Es.Count(EsIndex, EsType, esQuery)
 						if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
-							util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
+							util.RedisClusterSet(hashHref, "", -1)
 							set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
 							update = append(update, query)
 							update = append(update, set)
@@ -980,16 +915,7 @@ func (s *Spider) DownloadDetail(stype string) {
 							DownloadErrorData(s.Code, tmp)
 						}*/
 					} else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
-						log.Println("beforeHref:", href, "afterHref:", tmphref)
-						//增量
-						util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-						//全量
-						db := HexToBigIntMod(href)
-						hashHref := HexText(href)
-						isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
-						if !isExist {
-							util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
-						}
+						util.RedisClusterSet(hashHref, "", -1)
 					}
 
 					if !success { //下载失败更新次数和状态
@@ -1005,12 +931,7 @@ func (s *Spider) DownloadDetail(stype string) {
 						spLock.Unlock()
 						return
 					} else if data["delete"] != nil { //三级页过滤
-						//增量
-						util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-						//全量
-						db := HexToBigIntMod(href)
-						hashHref := HexText(href)
-						util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
+						util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
 						//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
 						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
 						update = append(update, query)
@@ -1173,36 +1094,3 @@ func HexText(href string) string {
 	h.Write([]byte(href))
 	return fmt.Sprintf("%x", h.Sum(nil))
 }
-
-//func RedisIsExist(href string) bool {
-//	isExist := false
-//	if len(href) > 75 { //取href的哈希判断是否存在
-//		hashHref := GetHas1(href)
-//		isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+hashHref)
-//	}
-//	if !isExist { //取string href判断是否存在
-//		isExist, _ = util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
-//	}
-//	return isExist
-//}
-
-//判断发布时间是否在三天内
-//func WithinThreeDays(data *map[string]interface{}) {
-//	withinThreeDays := false
-//	//根据发布时间打标记
-//	publishtime := util.ParseDate2Int64(qu.ObjToString((*data)["publishtime"])) //没有发布时间,取当前时间
-//	//发布时间
-//	now := time.Now().Unix()
-//	if now-publishtime > 259200 { //三天前数据
-//		withinThreeDays = false
-//	} else {
-//		withinThreeDays = true
-//	}
-//	if withinThreeDays {
-//		//qu.Debug("发布时间在三天内")
-//		(*data)["dataging"] = 0
-//	} else {
-//		//qu.Debug("发布时间在三天外")
-//		(*data)["dataging"] = 1
-//	}
-//}

+ 6 - 13
src/spider/store.go

@@ -79,15 +79,8 @@ func Store(mode, event int, c, coverAttr string, data map[string]interface{}, fl
 		}
 		href := fmt.Sprint(data["href"])
 		if len(href) > 5 && flag { //有效数据
-			db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
 			hashHref := HexText(href)
-			//增量
-			lu.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-			//全量
-			isExist, _ := lu.ExistRedis("title_repeat_fulljudgement", db, hashHref)
-			if !isExist {
-				lu.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
-			}
+			lu.RedisClusterSet(hashHref, "", -1)
 		}
 	} else if mode == 2 {
 		data["T"] = c
@@ -238,14 +231,14 @@ func SaveErrorData(modifyuser string, pd map[string]interface{}, err interface{}
 }
 
 //保存modal=1模式采集的列表页信息
-func SaveHighListPageData(tmp map[string]interface{}, code, href string, num *int) {
-	//先判断redis,防止信息重复
-	redisCode, _ := lu.GetRedisStr("title_repeat_listpagehref", 0, href)
+func SaveHighListPageData(tmp map[string]interface{}, code, hashHref string, num *int) {
+	//列表页href判重
+	redisCode := lu.RedisClusterGet("list_" + hashHref)
 	if redisCode != "" && strings.Contains(redisCode, code) { //相同爬虫采集且href相同,表示重复
 		*num++
 		return
-	} else { //存redis
-		lu.PutRedis("title_repeat_listpagehref", 0, href, code+"+"+redisCode, 86400*365*2) //100天
+	} else {
+		lu.RedisClusterSet("list_"+hashHref, code+"+"+redisCode, 86400*365*2) //两年
 	}
 	tmp["state"] = 0
 	tmp["event"] = lu.Config.Uploadevent