Browse Source

新版redis修改

maxiaoshan 2 years ago
parent
commit
6ceac5317b
5 changed files with 47 additions and 78 deletions
  1. 2 5
      src/config.json
  2. 3 2
      src/main.go
  3. 1 16
      src/spider/msgservice.go
  4. 36 50
      src/spider/spider.go
  5. 5 5
      src/spider/store.go

+ 2 - 5
src/config.json

@@ -27,10 +27,6 @@
     "redistype": "0",
     "serveraddress": "127.0.0.1:8030",
     "jsserveraddress":  "127.0.0.1:8031",
-    "redisclusteraddrs": [
-        "192.168.3.207:2179",
-        "192.168.3.166:2379"
-    ],
     "word":{
         "keyword":"(抽签|中标|招标|成交|合同|中标候选人|资格预审|拟建|邀请|询价|比选|议价|竞价|磋商|采购|招投标|答疑|变更公告|更正公告|竞争性谈判|竞谈|意见征询|澄清|单一来源|流标|废标|验收公告|中止|终止|违规|处罚|征集公告|开标结果|评审结果|监理|招租|租赁|评判结果|项目|遴选|补遗|竞标|征求意见|标段|定点结果|项目评审公示|采购项目违规|采购活动中违规|项目行政处罚|采购行政处罚|项目审批公示)",
         "notkeyword":"(招聘|拍卖|出租|出让|使用权|资产)"
@@ -41,7 +37,8 @@
         "ossAccessKeySecret":"Bk98FsbPYXcJe72n1bG3Ssf73acuNh",
         "ossBucketName":"jy-datafile"
     },
-    "redishosts": [],
+    "redisservers": "list=192.168.3.207:1779",
+    "bloomredisservers": "href=192.168.3.207:1679",
     "fileServer": "http://test.qmx.top:9333",
     "jsvmurl": "http://127.0.0.1:8080/jsvm",
     "renderaddr": "http://8.131.72.226:8998/render.json",

+ 3 - 2
src/main.go

@@ -50,8 +50,6 @@ func init() {
 	spider.MgoS.InitPool()
 	//初始化Redis
 	//InitRedis(Config.Redisservers)
-	//redis集群
-	InitRedisCluster(Config.RedisClusterAddrs)
 	//初始化es
 	spider.EsIndex = qu.ObjToString(Config.Es["index"])
 	spider.EsType = qu.ObjToString(Config.Es["type"])
@@ -67,6 +65,9 @@ func init() {
 	codegrpc.InitCodeGrpcClient()
 	//go执行js服务
 	gojs.InitGoRunJsClient()
+	//redis
+	InitRedisClient(Config.Redisservers)           //初始化Redis
+	InitBloomRedisClient(Config.BloomRedisservers) //初始化Bloom Redis
 	//初始化网络存储服务
 	OssInit(
 		qu.ObjToString(Config.OssInfo["ossEndpoint"]),

+ 1 - 16
src/spider/msgservice.go

@@ -251,8 +251,7 @@ func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis
 	size := len(bs) / (1024 * 1024)
 	if size > 10 {
 		href := fmt.Sprint(data["href"])
-		hashHref := util.HexText(href)
-		util.RedisClusterSet(hashHref, "", -1)
+		util.AddBloomRedis("href", href)
 		data["detail"] = ""      //字段太大
 		data["contenthtml"] = "" //字段太大
 		MgoS.Save("spider_filterdata", data)
@@ -285,25 +284,11 @@ func SaveObj(event int, checkAtrr string, data map[string]interface{}, saveredis
 		}
 		href := fmt.Sprint(data["href"])
 		if len(href) > 5 && saveredis { //有效数据
-			hashHref := HexText(href)
-			isExists := util.RedisClusterExists(hashHref)
-			data["redisexists"] = isExists
 			if arr := strings.Split(idAndColl, "+"); len(arr) == 2 { //保存服务未成功推送的信息(异常、重复等),返回值不是id
 				data["biddingid"] = arr[0]
 				data["biddingcoll"] = arr[1]
 			}
 			MgoS.Save("data_bak", data)
-			if !isExists {
-				util.RedisClusterSet(hashHref, "", -1)
-			}
-			//id := MgoS.Save("data_bak", data)
-			////保存服务未接收成功的数据会存入data_bak中,确保数据不丢失依赖补发程序
-			//if id != "" && !flag { //保存服务发送成功
-			//	hashHref := HexText(href)
-			//	if !util.RedisClusterExists(hashHref) { //保存服务过滤掉的异常数据目前不在其程序内存储href到全量的redis,此处补充(是否在保存服务端保存所有数据href)
-			//		util.RedisClusterSet(hashHref, "", -1)
-			//	}
-			//}
 		}
 	}
 }

+ 36 - 50
src/spider/spider.go

@@ -70,6 +70,8 @@ var SP = make(chan bool, 5)
 var TimeChan = make(chan bool, 1)
 var Reg = regexp.MustCompile(`(http|https)://([\w]+\.)+[\w]+(/?)`)
 var DelaySiteMap map[string]*DelaySite //延迟采集站点集合
+var RestrictAccessReg = regexp.MustCompile(`访问被拒绝`)
+
 type DelaySite struct {
 	DelayTime int
 	Compete   bool
@@ -159,39 +161,6 @@ func DownloadHighDetail(code string) {
 					}()
 					_id := tmp["_id"]
 					query := map[string]interface{}{"_id": _id}
-					href := qu.ObjToString(tmp["href"])
-					hashHref := HexText(href)
-					//由于目前列表页redis判重是href+code可能导致同一条href有多条不同code采集的数据存在
-					//为了避免重复下载,进行全量redis判重
-					isExist := util.RedisClusterExists(hashHref)
-					if isExist {
-						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
-						MgoS.Update("spider_highlistdata", query, set, false, false)
-						return
-					}
-					//if code == "a_gcy_mcgg" { //竞品数据es title判重
-					//	title := qu.ObjToString(tmp["title"])
-					//	eTime := time.Now().Unix()
-					//	sTime := eTime - int64(7*86400)
-					//	esQuery := `{"query": {"filtered": {"filter": {"bool": {"must": [{"range": {"comeintime": {"gte": "` + fmt.Sprint(sTime) + `","lte": "` + fmt.Sprint(eTime) + `"}}}]}},"query": {"bool": {"must": [{"multi_match": {"query": "` + title + `","type": "phrase","fields": ["title"]}}]}}}}}`
-					//	count := Es.Count(EsIndex, EsType, esQuery)
-					//	if count > 0 { //es中含本title数据,不再采集,更新list表数据状态
-					//		set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
-					//		MgoS.Update("spider_highlistdata", query, set, false, false)
-					//		util.PutRedis("title_repeat_judgement", 0, "url_repeat_"+href, "", 3600*24*365)
-					//		return
-					//	}
-					//}
-					//competehref := qu.ObjToString(tmp["competehref"])
-					//if competehref != "" { //验证三方网站数据剑鱼是否已采集
-					//	title := qu.ObjToString(tmp["title"])
-					//	one, _ := MgoS.FindOne("data_bak", map[string]interface{}{"title": title})
-					//	if one != nil && len(*one) > 0 { //剑鱼已采集,舍弃此条信息
-					//		set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": true, "updatetime": time.Now().Unix()}} //已存在state置为1
-					//		MgoS.Update("spider_highlistdata", query, set, false, false)
-					//		return
-					//	}
-					//}
 					times := qu.IntAll(tmp["times"])
 					success := true //数据是否下载成功的标志
 					delete(tmp, "_id")
@@ -215,9 +184,9 @@ func DownloadHighDetail(code string) {
 						} /*else if data == nil && times >= 3 { //下载问题,建editor任务
 							DownloadErrorData(s.Code, tmp)
 						}*/
-					} else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
+					} /* else if tmphref := qu.ObjToString(data["href"]); tmphref != href { //三级页href替换导致前后href不同
 						util.RedisClusterSet(hashHref, "", -1)
-					}
+					}*/
 					if !success { //下载失败更新次数和状态
 						ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
 						if times >= 3 { //3次下载失败今天不再下载,state置为1
@@ -227,27 +196,40 @@ func DownloadHighDetail(code string) {
 						MgoS.Update("spider_highlistdata", query, set, false, false)
 						return
 					} else if data["delete"] != nil { //三级页过滤
-						util.RedisClusterSet(hashHref, "", -1) //过滤掉的数据存值全量redis
 						//更新mgo 要删除的数据更新spider_highlistdata state=1不再下载,更新redis
-						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "delete": true, "updatetime": time.Now().Unix()}}
+						set := map[string]interface{}{"$set": map[string]interface{}{"state": 1, "exist": "delete", "updatetime": time.Now().Unix()}}
 						MgoS.Update("spider_highlistdata", query, set, false, false)
 						return
 					}
 					//正文、附件分析,下载异常数据重新下载
-					if AnalysisProjectInfo(data) {
+					if r := AnalysisProjectInfo(data); r != "" {
 						times++
 						ss := map[string]interface{}{"times": times, "updatetime": time.Now().Unix()}
 						if times >= 3 { //3次下载失败今天不再下载,state置为1
 							ss["state"] = -1
-							ss["detailfilerr"] = true
+							ss["detailfilerr"] = r
 						}
 						set := map[string]interface{}{"$set": ss}
 						MgoS.Update("spider_highlistdata", query, set, false, false)
 						return
 					}
-					t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
-					if t1 > time.Now().Unix() { //防止发布时间超前
-						data["publishtime"] = time.Now().Unix()
+					//t1 := util.ParseDate2Int64(qu.ObjToString(data["publishtime"]))
+					//if t1 > time.Now().Unix() { //防止发布时间超前
+					//	data["publishtime"] = time.Now().Unix()
+					//}
+					tmphref := qu.ObjToString(data["href"])
+					publishtime := qu.Int64All(data["l_np_publishtime"])
+					if publishtime < time.Now().AddDate(-1, 0, 0).Unix() {
+						isExist, _ := util.ExistsBloomRedis("href", tmphref)
+						if isExist {
+							set := map[string]interface{}{"$set": map[string]interface{}{
+								"state":      1,
+								"updatetime": time.Now().Unix(),
+								"exist":      "bloom_href",
+							}}
+							MgoS.Update("spider_highlistdata", query, set, false, false)
+							return
+						}
 					}
 					delete(data, "exit")
 					delete(data, "checkpublishtime")
@@ -271,9 +253,12 @@ func DownloadHighDetail(code string) {
 }
 
 //detail含“详情请访问原网页!”且附件未下成功的,不计入下载成功
-func AnalysisProjectInfo(data map[string]interface{}) bool {
+func AnalysisProjectInfo(data map[string]interface{}) string {
 	defer qu.Catch()
 	detail := qu.ObjToString(data["detail"])
+	if RestrictAccessReg.MatchString(detail) { //限制访问
+		return "ip"
+	}
 	if detail == "详情请访问原网页!" || detail == "<br/>详情请访问原网页!" { //不判断包含关系因为有些数据为json拼接,字段不全,会加“详情请访问原网页”
 		if projectinfo, ok := data["projectinfo"].(map[string]interface{}); ok && len(projectinfo) > 0 {
 			if attachments, ok := projectinfo["attachments"].(map[string]interface{}); ok && len(attachments) > 0 {
@@ -282,20 +267,21 @@ func AnalysisProjectInfo(data map[string]interface{}) bool {
 					if d, ok := data.(map[string]interface{}); ok {
 						fid := qu.ObjToString(d["fid"])
 						if fid != "" { //附件上传成功
-							fileOk = true
-							break
+							return ""
 						}
 					}
 				}
-				return !fileOk
+				if !fileOk {
+					return "detail_file"
+				}
 			} else {
-				return true
+				return "detail_file"
 			}
 		} else {
-			return true
+			return "detail_file"
 		}
 	}
-	return false
+	return ""
 }
 
 //下载解析内容页
@@ -340,7 +326,7 @@ func (s *Spider) DownloadDetailPage(param map[string]interface{}, data map[strin
 				if value, ok := v.(lua.LString); ok {
 					data[key] = string(value)
 				} else if value, ok := v.(lua.LNumber); ok {
-					data[key] = value
+					data[key] = int64(value)
 				} else if value, ok := v.(*lua.LTable); ok {
 					tmp := util.TableToMap(value)
 					data[key] = tmp

+ 5 - 5
src/spider/store.go

@@ -76,11 +76,11 @@ func Store(mode, event int, c, coverAttr string, data map[string]interface{}, fl
 				logger.Warn(c, mode, "保存失败", data)
 			}
 		}
-		href := fmt.Sprint(data["href"])
-		if len(href) > 5 && flag { //有效数据
-			hashHref := HexText(href)
-			lu.RedisClusterSet(hashHref, "", -1)
-		}
+		//href := fmt.Sprint(data["href"])
+		//if len(href) > 5 && flag { //有效数据
+		//	hashHref := HexText(href)
+		//	lu.RedisClusterSet(hashHref, "", -1)
+		//}
 	} else if mode == 2 {
 		data["T"] = c
 		SaveObj(event, coverAttr, data, flag)