浏览代码

新版redis调整

maxiaoshan 2 年之前
父节点
当前提交
38367ae4bc
共有 6 个文件被更改,包括 124 次插入219 次删除
  1. 6 8
      src/config.json
  2. 2 3
      src/main.go
  3. 0 85
      src/spider/history.go
  4. 2 17
      src/spider/msgservice.go
  5. 111 83
      src/spider/spider.go
  6. 3 23
      src/spider/store.go

+ 6 - 8
src/config.json

@@ -1,9 +1,9 @@
 {
 {
     "webport": "7400",
     "webport": "7400",
-    "mongodb_spider": "192.168.3.207:29099",
+    "mongodb_spider": "192.168.3.71:29099",
     "spider_dbsize": 50,
     "spider_dbsize": 50,
     "bideditor": {
     "bideditor": {
-        "addr": "192.168.3.207:29099",
+        "addr": "192.168.3.71:29099",
         "db": "editor",
         "db": "editor",
         "size": 5,
         "size": 5,
         "username": "",
         "username": "",
@@ -17,7 +17,7 @@
     "working": 0,
     "working": 0,
     "chansize": 4,
     "chansize": 4,
     "detailchansize": 20,
     "detailchansize": 20,
-    "uploadevent": 7400,
+    "uploadevent": 7100,
     "logLevel": 1,
     "logLevel": 1,
     "daynum": 6,
     "daynum": 6,
     "modal": 1,
     "modal": 1,
@@ -28,10 +28,8 @@
     "jsserveraddress":  "127.0.0.1:8031",
     "jsserveraddress":  "127.0.0.1:8031",
     "tesseractadd": "http://test.qmx.top:1688",
     "tesseractadd": "http://test.qmx.top:1688",
     "testdir": "res/test/spider_test.lua",
     "testdir": "res/test/spider_test.lua",
-    "redisclusteraddrs": [
-        "192.168.3.207:2179",
-        "192.168.3.207:2279"
-    ],
+    "redisservers": "list=192.168.3.207:1779",
+    "bloomredisservers": "href=192.168.3.207:1679",
     "word":{
     "word":{
     	"keyword":"(抽签|中标|招标|成交|合同|中标候选人|资格预审|拟建|邀请|询价|比选|议价|竞价|磋商|采购|招投标|答疑|变更公告|更正公告|竞争性谈判|竞谈|意见征询|澄清|单一来源|流标|废标|验收公告|中止|终止|违规|处罚|征集公告|开标结果|评审结果|监理|招租|租赁|评判结果|项目|遴选|补遗|竞标|征求意见)",
     	"keyword":"(抽签|中标|招标|成交|合同|中标候选人|资格预审|拟建|邀请|询价|比选|议价|竞价|磋商|采购|招投标|答疑|变更公告|更正公告|竞争性谈判|竞谈|意见征询|澄清|单一来源|流标|废标|验收公告|中止|终止|违规|处罚|征集公告|开标结果|评审结果|监理|招租|租赁|评判结果|项目|遴选|补遗|竞标|征求意见)",
     	"notkeyword":"(招聘|拍卖|出租|出让|使用权|资产)"
     	"notkeyword":"(招聘|拍卖|出租|出让|使用权|资产)"
@@ -44,7 +42,7 @@
     },
     },
     "fileServer": "http://test.qmx.top:9333",
     "fileServer": "http://test.qmx.top:9333",
     "jsvmurl": "http://127.0.0.1:8080/jsvm",
     "jsvmurl": "http://127.0.0.1:8080/jsvm",
-    "renderaddr": "http://8.131.72.226:8998/render.json",
+    "renderaddr": "http://splash.spdata.jianyu360.com/render.json",
     "proxyaddr": "http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch",
     "proxyaddr": "http://cc.spdata.jianyu360.com/crawl/proxy/socks5/fetch",
     "proxyauthor": "Basic amlhbnl1MDAxOjEyM3F3ZSFB",
     "proxyauthor": "Basic amlhbnl1MDAxOjEyM3F3ZSFB",
     "es": {
     "es": {

+ 2 - 3
src/main.go

@@ -36,9 +36,8 @@ func init() {
 	codegrpc.InitCodeGrpcClient()
 	codegrpc.InitCodeGrpcClient()
 	//go执行js服务
 	//go执行js服务
 	gojs.InitGoRunJsClient()
 	gojs.InitGoRunJsClient()
-	//InitRedis(Config.Redisservers) //初始化Redis
-	//redis集群
-	InitRedisCluster(Config.RedisClusterAddrs)
+	InitRedisClient(Config.Redisservers)           //初始化Redis
+	InitBloomRedisClient(Config.BloomRedisservers) //初始化Bloom Redis
 	//初始化es
 	//初始化es
 	spider.EsIndex = qu.ObjToString(Config.Es["index"])
 	spider.EsIndex = qu.ObjToString(Config.Es["index"])
 	spider.EsType = qu.ObjToString(Config.Es["type"])
 	spider.EsType = qu.ObjToString(Config.Es["type"])

+ 0 - 85
src/spider/history.go

@@ -37,91 +37,6 @@ func (s *Spider) StartSpider() {
 	}
 	}
 }
 }
 
 
-//加载应采集数据,进行采集
-//func (s *Spider) DownloadHistoryDetail() {
-//	defer qu.Catch()
-//	q := map[string]interface{}{"spidercode": s.Code, "state": 0}
-//	o := map[string]interface{}{"_id": 1}
-//	f := map[string]interface{}{
-//		"state":      0,
-//		"comeintime": 0,
-//		"event":      0,
-//	}
-//	//UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录采集三级页心跳
-//	list, _ := MgoS.Find("spider_historydata", q, o, f, false, 0, 200)
-//	if len(*list) == 0 { //数据量为0,表示无可下载数据,爬虫作废
-//		s.Stop = true
-//		return
-//	}
-//	//采集(目前未开多线程)
-//	for _, tmp := range *list {
-//		id := tmp["_id"]
-//		href := qu.ObjToString(tmp["href"])
-//		hashHref := sputil.HexText(href)
-//		isExist := sputil.RedisClusterExists(hashHref) //全量href redis判重
-//		if isExist {
-//			set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "href", "updatetime": time.Now().Unix()}} //已存在state置为1
-//			MgoS.UpdateById("spider_historydata", id, set)
-//			return
-//		}
-//		success := true    //数据是否下载成功的标志
-//		delete(tmp, "_id") //删除列表页信息无用字段_id
-//		data := map[string]interface{}{}
-//		for k, v := range tmp {
-//			data[k] = v
-//		}
-//		//下载、解析、入库
-//		data, err := s.DownloadDetailPage(tmp, data)
-//		//UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //下载数据心跳
-//		if err != nil || data == nil {
-//			success = false
-//			if err != nil {
-//				logger.Error(s.Code, err, tmp)
-//				//if len(tmp) > 0 {
-//				//	SaveErrorData(s.MUserName, tmp, err) //保存错误信息
-//				//}
-//			} /*else if data == nil && times >= 3 { //下载问题,建editor任务
-//				DownloadErrorData(s.Code, tmp)
-//			}*/
-//		} else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
-//			sputil.RedisClusterSet(hashHref, "", -1)
-//		}
-//		if !success { //下载失败
-//			set := map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}}
-//			MgoS.UpdateById("spider_historydata", id, set)
-//			return
-//		} else if data["delete"] != nil { //三级页过滤
-//			sputil.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
-//			//更新mgo 要删除的数据更新spider_historydata state=1不再下载,更新redis
-//			set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
-//			MgoS.UpdateById("spider_historydata", id, set)
-//			return
-//		}
-//		//正文、附件分析,下载异常数据重新下载
-//		if AnalysisProjectInfo(data) {
-//			set := map[string]interface{}{"$set": map[string]interface{}{"state": -1, "detailfilerr": true, "updatetime": time.Now().Unix()}}
-//			MgoS.UpdateById("spider_historydata", id, set)
-//			return
-//		}
-//		t1 := sputil.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
-//		if t1 > time.Now().Unix() { //防止发布时间超前
-//			data["publishtime"] = time.Now().Unix()
-//		}
-//		delete(data, "exit")
-//		delete(data, "checkpublishtime")
-//		data["comeintime"] = time.Now().Unix()
-//		data["spidercode"] = s.Code
-//		data["dataging"] = 0
-//		data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
-//		//发送保存服务
-//		Store(s.StoreMode, s.StoreToMsgEvent, s.Collection, s.CoverAttr, data, true)
-//		set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}} //下载成功state置为1
-//		MgoS.UpdateById("spider_historydata", id, set)
-//	}
-//	//采集完LoadScript
-//	s.LoadScript(&s.Name, &s.Channel, &s.MUserName, s.Code, s.ScriptFile, true, false)
-//}
-
 //定时检测数据集汇总爬虫
 //定时检测数据集汇总爬虫
 func GetHistoryDownloadSpider() {
 func GetHistoryDownloadSpider() {
 	defer qu.Catch()
 	defer qu.Catch()

+ 2 - 17
src/spider/msgservice.go

@@ -248,10 +248,9 @@ func SaveObjBlak(event int, checkAtrr string, c string, data []map[string]interf
 func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis bool) {
 func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis bool) {
 	bs, _ := json.Marshal(data)
 	bs, _ := json.Marshal(data)
 	size := len(bs) / (1024 * 1024)
 	size := len(bs) / (1024 * 1024)
-	if size > 10 {
+	if size > 10 { //超大数据过滤
 		href := fmt.Sprint(data["href"])
 		href := fmt.Sprint(data["href"])
-		hashHref := util.HexText(href)
-		util.RedisClusterSet(hashHref, "", -1)
+		util.AddBloomRedis("href", href)
 		data["detail"] = ""      //字段太大
 		data["detail"] = ""      //字段太大
 		data["contenthtml"] = "" //字段太大
 		data["contenthtml"] = "" //字段太大
 		MgoS.Save("spider_filterdata", data)
 		MgoS.Save("spider_filterdata", data)
@@ -284,25 +283,11 @@ func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis
 		}
 		}
 		href := fmt.Sprint(data["href"])
 		href := fmt.Sprint(data["href"])
 		if len(href) > 5 && saveredis { //有效数据
 		if len(href) > 5 && saveredis { //有效数据
-			hashHref := util.HexText(href)
-			isExists := util.RedisClusterExists(hashHref)
-			data["redisexists"] = isExists
 			if arr := strings.Split(idAndColl, "+"); len(arr) == 2 { //保存服务未成功推送的信息(异常、重复等),返回值不是id
 			if arr := strings.Split(idAndColl, "+"); len(arr) == 2 { //保存服务未成功推送的信息(异常、重复等),返回值不是id
 				data["biddingid"] = arr[0]
 				data["biddingid"] = arr[0]
 				data["biddingcoll"] = arr[1]
 				data["biddingcoll"] = arr[1]
 			}
 			}
 			MgoS.Save("data_bak", data)
 			MgoS.Save("data_bak", data)
-			if !isExists {
-				util.RedisClusterSet(hashHref, "", -1)
-			}
-			//id := mgu.Save("data_bak", "spider", "spider", data)
-			////保存服务未接收成功的数据会存入data_bak中,确保数据不丢失依赖补发程序
-			//if id != "" && !flag { //保存服务发送成功
-			//	hashHref := HexText(href)
-			//	if !util.RedisClusterExists(hashHref) { //保存服务过滤掉的异常数据目前不在其程序内存储href到全量的redis,此处补充(是否在保存服务端保存所有数据href)
-			//		util.RedisClusterSet(hashHref, "", -1)
-			//	}
-			//}
 		}
 		}
 	}
 	}
 }
 }

+ 111 - 83
src/spider/spider.go

@@ -342,7 +342,7 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
 						tmp["dataging"] = 0 //数据中打标记dataging=0
 						tmp["dataging"] = 0 //数据中打标记dataging=0
 						if s.DownDetail {
 						if s.DownDetail {
 							s.DownloadDetailItem(tmp, &repeatListNum)
 							s.DownloadDetailItem(tmp, &repeatListNum)
-						} else {
+						} /*else {//暂无此类爬虫
 							tmp["comeintime"] = time.Now().Unix()
 							tmp["comeintime"] = time.Now().Unix()
 							//atomic.AddInt32(&s.LastDowncount, 1)
 							//atomic.AddInt32(&s.LastDowncount, 1)
 							//atomic.AddInt32(&s.TodayDowncount, 1)
 							//atomic.AddInt32(&s.TodayDowncount, 1)
@@ -353,7 +353,7 @@ func (s *Spider) DownListPageItem() (errs interface{}) {
 								util.RedisClusterSet(hashHref, "", -1) //全量redis
 								util.RedisClusterSet(hashHref, "", -1) //全量redis
 								list = append(list, tmp)
 								list = append(list, tmp)
 							}
 							}
-						}
+						}*/
 					} else { //历史补漏
 					} else { //历史补漏
 						s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
 						s.HistoricalMendDownloadDetailItem(tmp) //历史补漏下载三级页
 					}
 					}
@@ -512,52 +512,81 @@ func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 		return
 		return
 	}
 	}
 	hashHref := util.HexText(href)
 	hashHref := util.HexText(href)
-	isExist := util.RedisClusterExists(hashHref) //全量redis判重
+	isExist := util.RedisExist("list", "list_"+hashHref)
 	//logger.Debug("full href:", href, " isExist:", isExist)
 	//logger.Debug("full href:", href, " isExist:", isExist)
 	if !s.IsMustDownload { //非强制下载
 	if !s.IsMustDownload { //非强制下载
 		if isExist { //数据存在,直接return
 		if isExist { //数据存在,直接return
 			return
 			return
 		} else if util.Config.IsHistoryEvent { //1、7000(历史节点)的历史补漏,数据存入spider_historydata
 		} else if util.Config.IsHistoryEvent { //1、7000(历史节点)的历史补漏,数据存入spider_historydata
 			num := 0
 			num := 0
-			SaveHighListPageData(paramdata, s.SCode, hashHref, &num)
+			SaveHighListPageData(paramdata, hashHref, &num)
 			return
 			return
 		}
 		}
+	} else { //当前不支持强制下载
+		return
 	}
 	}
-	//2、非7000(历史节点)的历史补漏,采完列表直接采详情,采完爬虫下架
+	//2、非7000(历史节点)的历史补漏,采完列表直接采详情,采完爬虫下架(当前无此爬虫)
 	id := ""
 	id := ""
-	SaveListPageData(paramdata, &id, false) //存储采集记录
+	isEsRepeat := false
+	if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
+		title := qu.ObjToString(paramdata["title"])
+		eTime := time.Now().Unix()
+		sTime := eTime - int64(7*86400)
+		esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
+		if Es.Count(EsIndex, EsType, esQuery) > 0 { //es中含本title数据,不再采集,更新list表数据状态
+			isEsRepeat = true
+		}
+	}
+	SaveListPageData(paramdata, &id, isEsRepeat) //存储采集记录
+	if isEsRepeat {                              //类竞品数据title判重数据加入redis
+		util.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
+		util.AddBloomRedis("href", href)
+		return
+	}
 	//qu.Debug("----------------下载、解析、入库--------------------")
 	//qu.Debug("----------------下载、解析、入库--------------------")
 	//下载详情页
 	//下载详情页
 	data, err = s.DownloadDetailPage(paramdata, data)
 	data, err = s.DownloadDetailPage(paramdata, data)
 	if err != nil || data == nil { //下载失败,结束
 	if err != nil || data == nil { //下载失败,结束
 		if err != nil {
 		if err != nil {
 			logger.Error(s.Code, err, paramdata)
 			logger.Error(s.Code, err, paramdata)
-			// if len(paramdata) > 0 {
-			// 	SaveErrorData(paramdata) //保存错误信息
-			// }
 		}
 		}
 		//更新spider_listdata中数据下载失败标记
 		//更新spider_listdata中数据下载失败标记
 		MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
 		MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1}})
 		return
 		return
-	} else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
-		util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href
+	}
+
+	util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //采集成功,加入列表页redis
+	//根据发布时间进行数据判重校验
+	tmphref := qu.ObjToString(data["href"]) //取tmphref,三级页href替换导致前后href不同
+	publishtime := qu.Int64All(data["l_np_publishtime"])
+	if publishtime < time.Now().AddDate(-1, 0, 0).Unix() { //一年前数据进行全量bloom redis href判重
+		isExist, _ = util.ExistsBloomRedis("href", tmphref)
+		if isExist {
+			MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "updatetime": time.Now().Unix()}})
+			return
+		}
 	}
 	}
 	//详情页过滤数据
 	//详情页过滤数据
 	set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}
 	set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}
 	if data["delete"] != nil {
 	if data["delete"] != nil {
-		util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
-		//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
-		set["delete"] = true
-		MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
+		//util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到
+		set["exist"] = "delete"
+		//MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
+		MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set})
 		return
 		return
 	}
 	}
 	//更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
 	//更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
 	MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
 	MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
 
 
+	//三级页href替换导致前后href不同,采集成功后将原始href加入全量redis
+	//if tmphref := qu.ObjToString(data["href"]); tmphref != href {
+	//	util.AddBloomRedis("href", href)
+	//}
+
 	flag := true
 	flag := true
-	t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime
-	if s.IsMustDownload {                                           //强制下载
-		if isExist && t1 < time.Now().AddDate(0, 0, -5).Unix() {
+	//publishtime := util.ParseDate2Int64(qu.ObjToString(data["publishtime"])) //publishtime
+	if s.IsMustDownload { //强制下载
+		if isExist && publishtime < time.Now().AddDate(0, 0, -5).Unix() {
 			//qu.Debug("强制下载 redis存在")
 			//qu.Debug("强制下载 redis存在")
 			data["dataging"] = 1 //此处dataging=1对应保存服务中取redis中href对应的id值,进行更新(现redis中已无id值,所以无效)
 			data["dataging"] = 1 //此处dataging=1对应保存服务中取redis中href对应的id值,进行更新(现redis中已无id值,所以无效)
 			flag = false
 			flag = false
@@ -571,16 +600,13 @@ func (s *Spider) HistoricalMendDownloadDetailItem(p interface{}) {
 			data["dataging"] = 0
 			data["dataging"] = 0
 		}
 		}
 	}
 	}
-	if t1 > time.Now().Unix() { //防止发布时间超前
-		data["publishtime"] = time.Now().Unix()
-	}
+	//if publishtime > time.Now().Unix() { //防止发布时间超前
+	//	data["publishtime"] = time.Now().Unix()
+	//}
 	delete(data, "state")
 	delete(data, "state")
 	delete(data, "exit")
 	delete(data, "exit")
 	delete(data, "checkpublishtime")
 	delete(data, "checkpublishtime")
 	data["comeintime"] = time.Now().Unix()
 	data["comeintime"] = time.Now().Unix()
-	//atomic.AddInt32(&s.LastDowncount, 1)
-	//atomic.AddInt32(&s.TodayDowncount, 1)
-	//atomic.AddInt32(&s.TotalDowncount, 1)
 	data["spidercode"] = s.Code
 	data["spidercode"] = s.Code
 	//qu.Debug("--------------开始保存---------------")
 	//qu.Debug("--------------开始保存---------------")
 	data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
 	data["iscompete"] = s.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
@@ -603,24 +629,20 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 		return
 		return
 	}
 	}
 	hashHref := util.HexText(href)
 	hashHref := util.HexText(href)
+	//列表页redis判重
+	isExist := util.RedisExist("list", "list_"+hashHref)
+	if isExist {
+		*num++ //已采集
+		return
+	}
 	id := ""                                                                                           //记录spider_listdata中保存的数据id,便于下载成功后更新状态
 	id := ""                                                                                           //记录spider_listdata中保存的数据id,便于下载成功后更新状态
-	if util.Config.Modal == 1 || (util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history") { //除7410、7500、7510、7700节点外所有节点只采集列表页信息
-		isExist := util.RedisClusterExists(hashHref) //全量信息中已采集
-		if isExist {
-			*num++ //已采集
-			return
-		}
-		SaveHighListPageData(paramdata, s.SCode, hashHref, num)
+	if util.Config.Modal == 1 || (util.Config.IsHistoryEvent && s.GetVar("spiderType") == "history") { //列表页、详情页分开采集模式节点和7000节点新爬虫采集的数据数据
+		SaveHighListPageData(paramdata, hashHref, num) //存表
 		return
 		return
 	} else {
 	} else {
 		if !s.Stop {
 		if !s.Stop {
 			UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
 			UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detail") //记录modal=0老模式采集三级页心跳
 		}
 		}
-		isExist := util.RedisClusterExists(hashHref) //全量信息中已采集
-		if isExist {
-			*num++ //已采集
-			return
-		}
 		isEsRepeat := false
 		isEsRepeat := false
 		if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
 		if delaySite := DelaySiteMap[s.Name]; delaySite != nil && delaySite.Compete {
 			title := qu.ObjToString(paramdata["title"])
 			title := qu.ObjToString(paramdata["title"])
@@ -633,7 +655,8 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 		}
 		}
 		SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7510、7520、7700节点列表页采集的信息
 		SaveListPageData(paramdata, &id, isEsRepeat) //保存7000、7410、7500、7510、7520、7700节点列表页采集的信息
 		if isEsRepeat {                              //类竞品数据title判重数据加入redis
 		if isEsRepeat {                              //类竞品数据title判重数据加入redis
-			util.RedisClusterSet(hashHref, "", -1) //全量存值
+			util.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
+			util.AddBloomRedis("href", href)
 			return
 			return
 		}
 		}
 	}
 	}
@@ -650,36 +673,47 @@ func (s *Spider) DownloadDetailItem(p interface{}, num *int) {
 		//更新spider_listdata中数据下载失败标记
 		//更新spider_listdata中数据下载失败标记
 		MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}})
 		MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": -1, "updatetime": time.Now().Unix()}})
 		return
 		return
-	} else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
+	} /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
 		util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href
 		util.RedisClusterSet(hashHref, "", -1) //全量redis中存值列表页href
+	}*/
+
+	util.RedisSet("list", "list_"+hashHref, "", 86400*365*2) //加入列表页redis
+	//根据发布时间进行数据判重校验
+	tmphref := qu.ObjToString(data["href"])
+	publishtime := qu.Int64All(data["l_np_publishtime"])
+	//7410节点(变链接节点)或者一年前数据进行全量bloomredis href判重
+	if util.Config.Uploadevent == 7410 || publishtime < time.Now().AddDate(-1, 0, 0).Unix() {
+		isExist, _ = util.ExistsBloomRedis("href", tmphref)
+		if isExist {
+			MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "bloom_href", "updatetime": time.Now().Unix()}})
+			return
+		}
 	}
 	}
 	//详情页下载数据成功心跳
 	//详情页下载数据成功心跳
 	if !s.Stop {
 	if !s.Stop {
 		UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳
 		UpdateHeart(s.Name, s.Channel, s.Code, s.MUserName, "detailexcute") //记录modal=0老模式采集到数据心跳
 	}
 	}
-	set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix(), "byid": id}
+	set := map[string]interface{}{"state": 1, "updatetime": time.Now().Unix()}
 	//详情页过滤数据
 	//详情页过滤数据
 	if data["delete"] != nil {
 	if data["delete"] != nil {
-		util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
-		//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
-		set["delete"] = true
-		MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
+		//util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到
+		set["exist"] = "delete"
+		//MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
+		MgoS.UpdateById("spider_listdata", id, map[string]interface{}{"$set": set})
 		return
 		return
 	}
 	}
+	set["byid"] = id
 	//更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
 	//更新spider_listdata中数据下载成功标记(根据链接更新数据state;可能由后续下载成功时更新)
 	MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
 	MgoS.Update("spider_listdata", map[string]interface{}{"href": href}, map[string]interface{}{"$set": set}, false, true)
 
 
-	t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
-	if t1 > time.Now().Unix() { //防止发布时间超前
-		data["publishtime"] = time.Now().Unix()
-	}
+	//三级页href替换导致前后href不同,采集成功后将原始href加入全量redis
+	//if tmphref := qu.ObjToString(data["href"]); tmphref != href {
+	//	util.AddBloomRedis("href", href)
+	//}
 	delete(data, "state")
 	delete(data, "state")
 	delete(data, "exit")
 	delete(data, "exit")
 	delete(data, "checkpublishtime")
 	delete(data, "checkpublishtime")
 	data["comeintime"] = time.Now().Unix()
 	data["comeintime"] = time.Now().Unix()
-	//atomic.AddInt32(&s.LastDowncount, 1)
-	//atomic.AddInt32(&s.TodayDowncount, 1)
-	//atomic.AddInt32(&s.TotalDowncount, 1)
 	data["spidercode"] = s.Code
 	data["spidercode"] = s.Code
 	data["iscompete"] = s.IsCompete   //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
 	data["iscompete"] = s.IsCompete   //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
 	data["infoformat"] = s.Infoformat //爬虫类型
 	data["infoformat"] = s.Infoformat //爬虫类型
@@ -764,7 +798,7 @@ func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[strin
 				if value, ok := v.(lua.LString); ok {
 				if value, ok := v.(lua.LString); ok {
 					data[key] = string(value)
 					data[key] = string(value)
 				} else if value, ok := v.(lua.LNumber); ok {
 				} else if value, ok := v.(lua.LNumber); ok {
-					data[key] = value
+					data[key] = int64(value)
 				} else if value, ok := v.(*lua.LTable); ok {
 				} else if value, ok := v.(*lua.LTable); ok {
 					tmp := util.TableToMap(value)
 					tmp := util.TableToMap(value)
 					data[key] = tmp
 					data[key] = tmp
@@ -914,20 +948,8 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 					_id := tmp["_id"]
 					_id := tmp["_id"]
 					query := map[string]interface{}{"_id": _id}
 					query := map[string]interface{}{"_id": _id}
 					href := qu.ObjToString(tmp["href"])
 					href := qu.ObjToString(tmp["href"])
-					hashHref := util.HexText(href)
+					//hashHref := util.HexText(href)
 					update := []map[string]interface{}{}
 					update := []map[string]interface{}{}
-					//由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
-					//为了避免重复下载,进行全量redis判重
-					isExist := util.RedisClusterExists(hashHref)
-					if isExist {
-						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "href", "updatetime": time.Now().Unix()}} //已存在state置为1
-						update = append(update, query)
-						update = append(update, set)
-						spLock.Lock()
-						updateArr = append(updateArr, update)
-						spLock.Unlock()
-						return
-					}
 					if isEsRepeat { //es数据title判重
 					if isEsRepeat { //es数据title判重
 						title := qu.ObjToString(tmp["title"])
 						title := qu.ObjToString(tmp["title"])
 						eTime := time.Now().Unix()
 						eTime := time.Now().Unix()
@@ -935,8 +957,8 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 						esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
 						esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
 						count := Es.Count(EsIndex, EsType, esQuery)
 						count := Es.Count(EsIndex, EsType, esQuery)
 						if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
 						if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
-							util.RedisClusterSet(hashHref, "", -1)
-							set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "title", "updatetime": time.Now().Unix()}} //已存在state置为1
+							util.AddBloomRedis("href", href)
+							set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "es", "updatetime": time.Now().Unix()}} //已存在state置为1
 							update = append(update, query)
 							update = append(update, query)
 							update = append(update, set)
 							update = append(update, set)
 							spLock.Lock()
 							spLock.Lock()
@@ -970,9 +992,9 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 						} /*else if data == nil && times >= 3 { //下载问题,建editor任务
 						} /*else if data == nil && times >= 3 { //下载问题,建editor任务
 							DownloadErrorData(s.Code, tmp)
 							DownloadErrorData(s.Code, tmp)
 						}*/
 						}*/
-					} else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
+					} /*else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
 						util.RedisClusterSet(hashHref, "", -1)
 						util.RedisClusterSet(hashHref, "", -1)
-					}
+					}*/
 
 
 					if !success { //下载失败更新次数和状态
 					if !success { //下载失败更新次数和状态
 						ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
 						ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
@@ -987,9 +1009,9 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 						spLock.Unlock()
 						spLock.Unlock()
 						return
 						return
 					} else if data["delete"] != nil { //三级页过滤
 					} else if data["delete"] != nil { //三级页过滤
-						util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
+						//util.AddBloomRedis("href", tmphref)//delete可能存在删除跳转网站的数据,加入全量redis后可能导致该网站采不到
 						//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
 						//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
-						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
+						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "delete", "updatetime": time.Now().Unix()}}
 						update = append(update, query)
 						update = append(update, query)
 						update = append(update, set)
 						update = append(update, set)
 						spLock.Lock()
 						spLock.Lock()
@@ -1013,22 +1035,28 @@ func (s *Spider) DownloadDetail(reload bool, isHistory bool) {
 						spLock.Unlock()
 						spLock.Unlock()
 						return
 						return
 					}
 					}
-					t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
-					if t1 > time.Now().Unix() { //防止发布时间超前
-						data["publishtime"] = time.Now().Unix()
+					//数据采集成功
+					//根据发布时间进行数据判重校验
+					tmphref := qu.ObjToString(data["href"])
+					publishtime := qu.Int64All(data["l_np_publishtime"])
+					if publishtime < time.Now().AddDate(-1, 0, 0).Unix() {
+						isExist, _ := util.ExistsBloomRedis("href", tmphref)
+						if isExist {
+							set := map[string]interface{}{"$set": map[string]interface{}{
+								"state":      1,
+								"updatetime": time.Now().Unix(),
+								"exist":      "bloom_href",
+							}}
+							update = append(update, query)
+							update = append(update, set)
+							spLock.Lock()
+							updateArr = append(updateArr, update)
+							spLock.Unlock()
+							return
+						}
 					}
 					}
 					delete(data, "exit")
 					delete(data, "exit")
 					delete(data, "checkpublishtime")
 					delete(data, "checkpublishtime")
-					//计数
-					//tmpsp1, b := Allspiders.Load(s.Code)
-					//if b {
-					//	sp1, ok := tmpsp1.(*Spider)
-					//	if ok {
-					//		atomic.AddInt32(&sp1.LastDowncount, 1)
-					//		atomic.AddInt32(&sp1.TodayDowncount, 1)
-					//		atomic.AddInt32(&sp1.TotalDowncount, 1)
-					//	}
-					//}
 					data["comeintime"] = time.Now().Unix()
 					data["comeintime"] = time.Now().Unix()
 					data["spidercode"] = s.Code
 					data["spidercode"] = s.Code
 					data["dataging"] = 0
 					data["dataging"] = 0

+ 3 - 23
src/spider/store.go

@@ -236,28 +236,8 @@ func SaveErrorData(modifyuser string, pd map[string]interface{}, err interface{}
 }
 }
 
 
 //保存modal=1模式采集的列表页信息
 //保存modal=1模式采集的列表页信息
-func SaveHighListPageData(tmp map[string]interface{}, code, hashHref string, num *int) {
-	//列表页href判重
-	isExist := lu.RedisClusterExists("list_" + hashHref)
-	if isExist {
-		*num++
-		return
-	} else {
-		lu.RedisClusterSet("list_"+hashHref, "", 86400*365*2) //不存在,存两年
-	}
-
-	//redisCode := lu.RedisClusterGet("list_" + hashHref)
-	////此处区分历史节点(7000)和增量节点
-	//if redisCode != "" {
-	//	if lu.Config.IsHistoryEvent || strings.Contains(redisCode, code) { //列表页数据已采集
-	//		*num++
-	//		return
-	//	} else {
-	//		lu.RedisClusterSet("list_"+hashHref, code+"+"+redisCode, 86400*365*2) //两年
-	//	}
-	//} else {
-	//	lu.RedisClusterSet("list_"+hashHref, code+"+"+redisCode, 86400*365*2) //两年
-	//}
+func SaveHighListPageData(tmp map[string]interface{}, hashHref string, num *int) {
+	lu.RedisSet("list", "list_"+hashHref, "", 86400*365*2)
 	tmp["state"] = 0
 	tmp["state"] = 0
 	tmp["event"] = lu.Config.Uploadevent
 	tmp["event"] = lu.Config.Uploadevent
 	tmp["comeintime"] = time.Now().Unix()
 	tmp["comeintime"] = time.Now().Unix()
@@ -279,7 +259,7 @@ func SaveListPageData(tmp map[string]interface{}, id *string, isEsRepeat bool) {
 	tmp["count"] = count //当前href spider_listdata已有多少条记录(可用于评估爬虫维护的及时性)
 	tmp["count"] = count //当前href spider_listdata已有多少条记录(可用于评估爬虫维护的及时性)
 	if isEsRepeat {      //类竞品数据es判重掉后,更新状态
 	if isEsRepeat {      //类竞品数据es判重掉后,更新状态
 		tmp["state"] = 1
 		tmp["state"] = 1
-		tmp["exist"] = true
+		tmp["exist"] = "es"
 		tmp["updatetime"] = time.Now().Unix()
 		tmp["updatetime"] = time.Now().Unix()
 	}
 	}
 	*id = MgoS.Save("spider_listdata", tmp)
 	*id = MgoS.Save("spider_listdata", tmp)