浏览代码

redis更新为集群

maxiaoshan 3 年之前
父节点
当前提交
1b238ce509
共有 5 个文件被更改,包括 69 次插入133 次删除
  1. 10 7
      src/config.json
  2. 3 1
      src/main.go
  3. 10 36
      src/spider/msgservice.go
  4. 45 81
      src/spider/spider.go
  5. 1 8
      src/spider/store.go

+ 10 - 7
src/config.json

@@ -20,17 +20,20 @@
     "tesseractadd": "http://test.qmx.top:1688",
     "testdir": "res/spider_a_zgzbtbggfwpt_zgysgg2.lua",
     "redistype": "0",
-    "redisservers": "title_repeat_judgement=192.168.3.207:1679,title_repeat_fulljudgement=192.168.3.207:1679,title_repeat_listpagehref=192.168.3.207:1679",
     "serveraddress": "127.0.0.1:8030",
+    "redisclusteraddrs": [
+        "192.168.3.207:2179",
+        "192.168.3.166:2379"
+    ],
     "word":{
-    	"keyword":"(抽签|中标|招标|成交|合同|中标候选人|资格预审|拟建|邀请|询价|比选|议价|竞价|磋商|采购|招投标|答疑|变更公告|更正公告|竞争性谈判|竞谈|意见征询|澄清|单一来源|流标|废标|验收公告|中止|终止|违规|处罚|征集公告|开标结果|评审结果|监理|招租|租赁|评判结果|项目|遴选|补遗|竞标|征求意见|标段|定点结果|项目评审公示|采购项目违规|采购活动中违规|项目行政处罚|采购行政处罚|项目审批公示)",
-    	"notkeyword":"(招聘|拍卖|出租|出让|使用权|资产)"
+        "keyword":"(抽签|中标|招标|成交|合同|中标候选人|资格预审|拟建|邀请|询价|比选|议价|竞价|磋商|采购|招投标|答疑|变更公告|更正公告|竞争性谈判|竞谈|意见征询|澄清|单一来源|流标|废标|验收公告|中止|终止|违规|处罚|征集公告|开标结果|评审结果|监理|招租|租赁|评判结果|项目|遴选|补遗|竞标|征求意见|标段|定点结果|项目评审公示|采购项目违规|采购活动中违规|项目行政处罚|采购行政处罚|项目审批公示)",
+        "notkeyword":"(招聘|拍卖|出租|出让|使用权|资产)"
     },
     "oss":{
-    	"ossEndpoint":"oss-cn-beijing-internal.aliyuncs.com",
-		"ossAccessKeyId":"LTAI4G5x9aoZx8dDamQ7vfZi",  
-		"ossAccessKeySecret":"Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
-		"ossBucketName":"jy-datafile"
+        "ossEndpoint":"oss-cn-beijing-internal.aliyuncs.com",
+        "ossAccessKeyId":"LTAI4G5x9aoZx8dDamQ7vfZi",
+        "ossAccessKeySecret":"Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
+        "ossBucketName":"jy-datafile"
     },
     "redishosts": [],
     "fileServer": "http://test.qmx.top:9333",

+ 3 - 1
src/main.go

@@ -38,7 +38,9 @@ func init() {
 	}
 	spider.MgoS.InitPool()
 	//初始化Redis
-	InitRedis(Config.Redisservers)
+	//InitRedis(Config.Redisservers)
+	//redis集群
+	InitRedisCluster(Config.RedisClusterAddrs)
 	//初始化es
 	spider.EsIndex = qu.ObjToString(Config.Es["index"])
 	spider.EsType = qu.ObjToString(Config.Es["type"])

+ 10 - 36
src/spider/msgservice.go

@@ -9,7 +9,6 @@ import (
 	mu "mfw/util"
 	qu "qfw/util"
 
-	//"qfw/util/redis"
 	util "spiderutil"
 	"time"
 
@@ -277,43 +276,18 @@ func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis
 		}
 		href := fmt.Sprint(data["href"])
 		if len(href) > 5 && saveredis { //有效数据
-			db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
 			hashHref := HexText(href)
-			//增量
-			isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
-			id := ""
-			if !isExist {
-				id = MgoS.Save("data_bak", data)
-			} else { //记录重复数据,spider_repeatdata
-				data["from"] = "spider"
-				MgoS.Save("spider_repeatdata", data)
-			}
-			//保存服务未接收成功的数据会存入data_bak中,确保数据不丢失依赖补发程序
-			if id != "" {
-				util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-				if !flag { //保存服务发送成功
-					//全量(判断是否已存在防止覆盖id)
-					isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
-					if !isExist {
-						util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
-					}
-				}
-			}
+			data["redisexists"] = util.RedisClusterExists(hashHref)
+			MgoS.Save("data_bak", data)
+			//id := MgoS.Save("data_bak", data)
+			////保存服务未接收成功的数据会存入data_bak中,确保数据不丢失依赖补发程序
+			//if id != "" && !flag { //保存服务发送成功
+			//	hashHref := HexText(href)
+			//	if !util.RedisClusterExists(hashHref) { //保存服务过滤掉的异常数据目前不在其程序内存储href到全量的redis,此处补充(是否在保存服务端保存所有数据href)
+			//		util.RedisClusterSet(hashHref, "", -1)
+			//	}
+			//}
 		}
-		// id := MgoS.Save("data_bak", data)
-		// if !flag && id != "" {
-		// 	href := fmt.Sprint(data["href"])
-		// 	if len(href) > 5 && saveredis { //有效数据
-		// 		db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
-		// 		//增量
-		// 		util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
-		// 		//全量(判断是否已存在防止覆盖id)
-		// 		isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, "url_repeat_"+href)
-		// 		if !isExist {
-		// 			util.PutRedis("title_repeat_fulljudgement", db, "url_repeat_"+href, "", -1)
-		// 		}
-		// 	}
-		// }
 	}
 }
 

+ 45 - 81
src/spider/spider.go

@@ -114,18 +114,19 @@ func DownloadHighDetail(code string) {
 			comeintime := map[string]interface{}{"$gte": startTime} //指定查询数据的时间
 			if day != 0 {                                           //不是当天,指定数据范围
 				comeintime["$lt"] = GetTime(-day + 1)
-			} else if code == "a_gcy_mcgg" { //
-				endTime := time.Now().Unix() - 12*3600
-				if endTime > startTime {
-					comeintime = map[string]interface{}{
-						"$gte": startTime,
-						"$lt":  endTime,
-					}
-				} else {
-					continue
-				}
-
 			}
+			//} else if code == "a_gcy_mcgg" { //延迟采集站点(延迟采集站点不加入多线程采集luaspecialcode库中)
+			//	endTime := time.Now().Unix() - 12*3600
+			//	if endTime > startTime {
+			//		comeintime = map[string]interface{}{
+			//			"$gte": startTime,
+			//			"$lt":  endTime,
+			//		}
+			//	} else {
+			//		continue
+			//	}
+			//
+			//}
 			q["comeintime"] = comeintime
 			list, _ = MgoS.Find("spider_highlistdata", q, o, f, false, 0, 100)
 			//logger.Debug("code:", code, "query:", q, "当前查询数据量:", len(*list))
@@ -154,37 +155,38 @@ func DownloadHighDetail(code string) {
 					_id := tmp["_id"]
 					query := map[string]interface{}{"_id": _id}
 					href := qu.ObjToString(tmp["href"])
+					hashHref := HexText(href)
 					//由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
-					//为了避免重复下载,进行量redis判重
-					isExist, _ := util.ExistRedis("title_repeat_judgement", 0, "url_repeat_"+href)
+					//为了避免重复下载,进行量redis判重
+					isExist := util.RedisClusterExists(hashHref)
 					if isExist {
 						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
 						MgoS.Update("spider_highlistdata", query, set, false, false)
 						return
 					}
-					if code == "a_gcy_mcgg" { //竞品数据es title判重
-						title := qu.ObjToString(tmp["title"])
-						eTime := time.Now().Unix()
-						sTime := eTime - int64(7*86400)
-						esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
-						count := Es.Count(EsIndex, EsType, esQuery)
-						if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
-							set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
-							MgoS.Update("spider_highlistdata", query, set, false, false)
-							util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-							return
-						}
-					}
-					competehref := qu.ObjToString(tmp["competehref"])
-					if competehref != "" { //验证三方网站数据剑鱼是否已采集
-						title := qu.ObjToString(tmp["title"])
-						one, _ := MgoS.FindOne("data_bak", map[string]interface{}{"title": title})
-						if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
-							set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
-							MgoS.Update("spider_highlistdata", query, set, false, false)
-							return
-						}
-					}
+					//if code == "a_gcy_mcgg" { //竞品数据es title判重
+					//	title := qu.ObjToString(tmp["title"])
+					//	eTime := time.Now().Unix()
+					//	sTime := eTime - int64(7*86400)
+					//	esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
+					//	count := Es.Count(EsIndex, EsType, esQuery)
+					//	if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
+					//		set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
+					//		MgoS.Update("spider_highlistdata", query, set, false, false)
+					//		util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
+					//		return
+					//	}
+					//}
+					//competehref := qu.ObjToString(tmp["competehref"])
+					//if competehref != "" { //验证三方网站数据剑鱼是否已采集
+					//	title := qu.ObjToString(tmp["title"])
+					//	one, _ := MgoS.FindOne("data_bak", map[string]interface{}{"title": title})
+					//	if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
+					//		set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
+					//		MgoS.Update("spider_highlistdata", query, set, false, false)
+					//		return
+					//	}
+					//}
 					times := qu.IntAll(tmp["times"])
 					success := true //数据是否下载成功的标志
 					delete(tmp, "_id")
@@ -204,25 +206,12 @@ func DownloadHighDetail(code string) {
 							if len(tmp) > 0 {
 								SaveErrorData(sp.MUserName, tmp, err) //保存错误信息
 							}
-							if errstr, ok := err.(*lua.ApiError); ok {
-								errText := errstr.Object.String()
-								logger.Info(errText, errText == "d.nx != 0")
-							}
 
 						} /*else if data == nil && times >= 3 { //下载问题,建editor任务
 							DownloadErrorData(s.Code, tmp)
 						}*/
 					} else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
-						log.Println("beforeHref:", href, "afterHref:", href)
-						//增量
-						util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-						//全量
-						db := HexToBigIntMod(tmphref)
-						hashHref := HexText(href)
-						isExist, _ := util.ExistRedis("title_repeat_fulljudgement", db, hashHref)
-						if !isExist {
-							util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
-						}
+						util.RedisClusterSet(hashHref, "", -1)
 					}
 					if !success { //下载失败更新次数和状态
 						ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
@@ -232,11 +221,12 @@ func DownloadHighDetail(code string) {
 						set := map[string]interface{}{"$set": ss}
 						MgoS.Update("spider_highlistdata", query, set, false, false)
 						return
-					} else { //三级页过滤
-						deleteData := FilterByDetail(href, query, data) //针对列表页无法过滤需要在详情页过滤的数据,进行过滤处理
-						if deleteData {
-							return
-						}
+					} else if data["delete"] != nil { //三级页过滤
+						util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
+						//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
+						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
+						MgoS.Update("spider_highlistdata", query, set, false, false)
+						return
 					}
 					//正文、附件分析,下载异常数据重新下载
 					if AnalysisProjectInfo(data) {
@@ -257,16 +247,6 @@ func DownloadHighDetail(code string) {
 					delete(data, "exit")
 					delete(data, "checkpublishtime")
 					data["comeintime"] = time.Now().Unix()
-					//计数
-					//tmpsp1, b := Allspiders.Load(sp.Code)
-					//if b {
-					//	sp1, ok := tmpsp1.(*Spider)
-					//	if ok {
-					//		atomic.AddInt32(&sp1.LastDowncount, 1)
-					//		atomic.AddInt32(&sp1.TodayDowncount, 1)
-					//		atomic.AddInt32(&sp1.TotalDowncount, 1)
-					//	}
-					//}
 					data["spidercode"] = sp.Code
 					data["dataging"] = 0
 					data["iscompete"] = sp.IsCompete //2021-11-01以后新增的爬虫不在展示原文链接(保存服务判断)
@@ -312,22 +292,6 @@ func AnalysisProjectInfo(data map[string]interface{}) bool {
 	return false
 }
 
-func FilterByDetail(href string, query, data map[string]interface{}) bool {
-	if data["delete"] != nil {
-		//增量
-		util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-		//全量
-		db := HexToBigIntMod(href)
-		hashHref := HexText(href)
-		util.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
-		//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
-		set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
-		MgoS.Update("spider_highlistdata", query, set, false, false)
-		return true
-	}
-	return false
-}
-
 //下载解析内容页
 func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[string]interface{}) (map[string]interface{}, interface{}) {
 	defer mu.Catch()

+ 1 - 8
src/spider/store.go

@@ -78,15 +78,8 @@ func Store(mode, event int, c, coverAttr string, data map[string]interface{}, fl
 		}
 		href := fmt.Sprint(data["href"])
 		if len(href) > 5 && flag { //有效数据
-			db := HexToBigIntMod(href) //根据href的哈希值选择Redis的db
 			hashHref := HexText(href)
-			//增量
-			lu.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, href, 3600*24*365)
-			//全量
-			isExist, _ := lu.ExistRedis("title_repeat_fulljudgement", db, hashHref)
-			if !isExist {
-				lu.PutRedis("title_repeat_fulljudgement", db, hashHref, "", -1)
-			}
+			lu.RedisClusterSet(hashHref, "", -1)
 		}
 	} else if mode == 2 {
 		data["T"] = c