浏览代码

判重备份

apple 5 年之前
父节点
当前提交
0bce71b965
共有 5 个文件被更改,包括 38 次插入325 次删除
  1. 4 2
      udpcreateindex/src/winnerextract.go
  2. 4 289
      udpfilterdup/src/README.md
  3. 12 6
      udpfilterdup/src/config.json
  4. 2 2
      udpfilterdup/src/datamap.go
  5. 16 26
      udpfilterdup/src/main.go

+ 4 - 2
udpcreateindex/src/winnerextract.go

@@ -64,7 +64,7 @@ func winnerEsTaskOnce()  {
 		}(tmp)
 		tmp = make(map[string]interface{})
 	}
-
+	wg.Wait()
 
 
 	it_2 := sess.DB(winnermgo.DbName).C(db_c2).Find(&q).Sort("_id").Iter()
@@ -92,6 +92,8 @@ func winnerEsTaskOnce()  {
 		tmp = make(map[string]interface{})
 	}
 
-	log.Println("总计:",num_1+num_2)
+	wg.Wait()
+
+	log.Println("winnerextract  索引完毕!  总计:",num_1+num_2)
 
 }

+ 4 - 289
udpfilterdup/src/README.md

@@ -1,59 +1,3 @@
-基于内存的信息重复过滤
-"extract": "result_file_20200410",
-"extract_back": "result_file_20200409",
-
-{
-    "udpport": ":11485",
-    "dupdays": 7,
-    "mongodb": {
-        "addr": "172.17.4.187:27083",
-        "pool": 10,
-        "db": "qfw",
-        "extract": "result_file_20200410",
-        "extract_back": "result_file_20200409",
-        "site": {
-            "dbname": "qfw",
-            "coll": "site"
-        }
-    },
-    "jkmail": {
-        "to": "zhangjinkun@topnet.net.cn",
-        "api": "http://10.171.112.160:19281/_send/_mail"
-    },
-    "nextNode": [
-        {
-            "addr": "172.17.145.179",
-            "port": 1782,
-            "stype": "project",
-            "memo": "合并项目"
-        },
-        {
-            "addr": "127.0.0.1",
-            "port": 1783,
-            "stype": "bidding",
-            "memo": "创建招标数据索引new"
-        }
-    ],
-
-    "threads": 1,
-    "isMerger": false,
-    "isSort":false,
-    "lowHeavy":false,
-    "timingTask":true,
-    "timingSpanDay": 3,
-    "timingPubScope": 720,
-    "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
-    "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
-    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",
-    "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
-    "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
-}
-
-
-
-
-
-
 {
     "udpport": ":1785",
     "dupdays": 5,
@@ -73,12 +17,6 @@
         "api": "http://10.171.112.160:19281/_send/_mail"
     },
     "nextNode": [
-        {
-            "addr": "172.17.145.179",
-            "port": 1782,
-            "stype": "project",
-            "memo": "合并项目"
-        },
         {
             "addr": "127.0.0.1",
             "port": 1783,
@@ -87,237 +25,14 @@
         }
     ],
     "threads": 1,
-    "isMerger": false,
-    "isSort":true,
-    "lowHeavy":false,
+    "isMerger": true,
+    "lowHeavy":true,
     "timingTask":false,
     "timingSpanDay": 3,
     "timingPubScope": 720,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
-    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",
+    "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)",
     "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]",
     "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研"
-}
-
-
-
-
-//basic_bool := basicDataScore(source, info)
-					//if basic_bool {
-					//	//已原始数据为标准 - 对比数据打判重标签-
-					//	newData, mergeArr, is_replace = mergeDataFields(source, info)
-					//	//对比数据打重复标签的id,原始数据id的记录
-					//	repeat_idMap["_id"] = StringTOBsonId(info.id)
-					//	merge_idMap["_id"] = StringTOBsonId(source.id)
-					//
-					//	if IdType {
-					//		repeat_idMap["_id"] = info.id
-					//		merge_idMap["_id"] = source.id
-					//	}
-					//	repeat_id = source.id
-					//} else {
-					//	//已对比数据为标准 ,数据池的数据打判重标签
-					//	newData, mergeArr, is_replace = mergeDataFields(info, source)
-					//	DM.replaceSourceData(newData, source) //替换
-					//	//原始数据打重复标签的id,   对比数据id的记录
-					//	repeat_idMap["_id"] = StringTOBsonId(source.id)
-					//	merge_idMap["_id"] = StringTOBsonId(info.id)
-					//	if IdType {
-					//		repeat_idMap["_id"] = source.id
-					//		merge_idMap["_id"] = info.id
-					//	}
-					//	repeat_id = info.id
-					//}
-
-
-
-
-//basic_bool := basicDataScore(source, info)
-					//if !basic_bool {
-					//	DM.replaceSourceData(info, source) //替换
-					//	repeat_idMap["_id"] = StringTOBsonId(source.id)
-					//	if IdType {
-					//		repeat_idMap["_id"] = source.id
-					//	}
-					//	repeat_id = info.id
-					//	if len(ids)>=9 {
-					//		ids=append(ids,source.id)
-					//
-					//
-					//		for _, to := range nextNode {
-					//
-					//			key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
-					//			by, _ := json.Marshal(map[string]interface{}{
-					//				"gtid":  source.id,
-					//				"lteid": source.id,
-					//				"stype": util.ObjToString(to["stype"]),
-					//				"key":   key,
-					//				"ids":   strings.Join(ids, ","),
-					//			})
-					//			addr := &net.UDPAddr{
-					//				IP:   net.ParseIP(to["addr"].(string)),
-					//				Port: util.IntAll(to["port"]),
-					//			}
-					//			node := &udpNode{by, addr, time.Now().Unix(), 0}
-					//			udptaskmap.Store(key, node)
-					//			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
-					//		}
-					//
-					//		//
-					//		ids = []string{}
-					//	}else {
-					//		ids=append(ids,source.id)
-					//	}
-					//
-					//}
-		
-		
-		
-					
-if isMerger { //合并相关
-					newData, mergeArr, is_replace := mergeDataFields(source, info)
-					merge_map := make(map[string]interface{}, 0)
-					if is_replace { //支持合并-更新数据
-						merge_map = map[string]interface{}{
-							"$set": map[string]interface{}{
-								"merge": newData.mergemap,
-							},
-						}
-						//更新合并后的数据
-						for _, value := range mergeArr {
-							if value == 0 {
-								merge_map["$set"].(map[string]interface{})["area"] = newData.area
-								merge_map["$set"].(map[string]interface{})["city"] = newData.city
-							} else if value == 1 {
-								merge_map["$set"].(map[string]interface{})["area"] = newData.area
-								merge_map["$set"].(map[string]interface{})["city"] = newData.city
-							} else if value == 2 {
-								merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
-							} else if value == 3 {
-								merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
-							} else if value == 4 {
-								merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
-							} else if value == 5 {
-								merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
-							} else if value == 6 {
-								merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
-							} else if value == 7 {
-								merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
-							} else if value == 8 {
-								merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
-							} else if value == 9 {
-								merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
-							} else if value == 10 {
-								merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
-							} else if value == 11 {
-								merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
-							} else {
-							}
-						}
-						//模板数据更新
-						updateExtract = append(updateExtract, []map[string]interface{}{
-							merge_idMap,
-							merge_map,
-						})
-					}
-				}	
-				
-				
-				
-				
-				
-				
-"winnerextract": {
-      "db_addr": "172.17.145.163:27082",
-      "db_name": "",
-      "db_pool": 5,
-      "db_c1": "winner_enterprise",
-      "db_c2": "winner_err",
-      "es_addr": "http://172.17.145.170:9800",
-      "es_index": "winner_v1",
-      "es_type": "winner"
-    }
-    
-    
-    func testWinnerExtract() {
-    	//查询表 - 生ES - 全量
-    
-    	Mgo = &MongodbSim{
-    		MongodbAddr: "172.17.145.163:27082",
-    		DbName:      "extract_v3",
-    		Size:        10,
-    	}
-    	Mgo.InitPool()
-    	sess := Mgo.GetMgoConn()
-    	defer mgo.DestoryMongoConn(sess)
-    
-    
-    	//初始化es
-    	es.InitElasticSize("http://172.17.145.170:9800",10)
-    	EsConn := es.GetEsConn()
-    	defer es.DestoryEsConn(EsConn)
-    
-    
-    	it := sess.DB(Mgo.DbName).C("winner_enterprise").Find(map[string]interface{}{}).Sort("_id").Iter()
-    	num :=0
-    
-    
-    	pool := make(chan bool, 5)
-    	wg := &sync.WaitGroup{}
-    	for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
-    		if num%10000 == 0 {
-    			log.Println("遍历数量:",num)
-    		}
-    
-    		pool <- true
-    		wg.Add(1)
-    		go func(tmp map[string]interface{}) {
-    			defer func() {
-    				<-pool
-    				wg.Done()
-    			}()
-    			savetmp := make(map[string]interface{}, 0)
-    			tmp_id := tmp["_id"].(primitive.ObjectID).Hex()
-    			savetmp["_id"] = tmp_id
-    			savetmp["name"] = tmp["company_name"]
-    			savetmp["pici"] = tmp["updatetime"]
-    
-    			if _, err := EsConn.Index().Index("winner_v1").Type("winner").Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-    				log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
-    			}
-    		}(tmp)
-    		tmp = make(map[string]interface{})
-    	}
-    
-    	it_err := sess.DB(Mgo.DbName).C("winner_err").Find(map[string]interface{}{}).Sort("_id").Iter()
-    	num1 :=0
-    	pool_err := make(chan bool, 5)
-    	wg_err := &sync.WaitGroup{}
-    
-    	for tmp := make(map[string]interface{}); it_err.Next(&tmp); num++ {
-    		if num1%10000 == 0 {
-    			log.Println("遍历数量:",num1)
-    		}
-    		pool_err <- true
-    		wg_err.Add(1)
-    		go func(tmp map[string]interface{}) {
-    			defer func() {
-    				<-pool_err
-    				wg_err.Done()
-    			}()
-    			savetmp := make(map[string]interface{}, 0)
-    			tmp_id := tmp["_id"].(primitive.ObjectID).Hex()
-    			savetmp["_id"] = tmp_id
-    			savetmp["name"] = tmp["name"]
-    			savetmp["pici"] = tmp["updatetime"]
-    			if _, err := EsConn.Index().Index("winner_v1").Type("winner").Id(tmp_id).BodyJson(savetmp).Refresh(true).Do(); err != nil {
-    				log.Println("save es err :", tmp["_id"], savetmp["_id"], err)
-    			}
-    
-    		}(tmp)
-    		tmp = make(map[string]interface{})
-    	}
-    	log.Println("num+num1==",num+num1)
-    
-    }				
+}

+ 12 - 6
udpfilterdup/src/config.json

@@ -1,14 +1,14 @@
 {
-    "udpport": ":11888",
+    "udpport": ":1785",
     "dupdays": 5,
     "mongodb": {
-        "addr": "192.168.3.207:27092",
+        "addr": "172.17.4.187:27083",
         "pool": 5,
-        "db": "extract_kf",
-        "extract": "zk_zk_test",
-        "extract_back": "zk_zk_test",
+        "db": "qfw",
+        "extract": "result_file_20200410",
+        "extract_back": "result_file_20200409",
         "site": {
-            "dbname": "zhaolongyue",
+            "dbname": "qfw",
             "coll": "site"
         }
     },
@@ -17,6 +17,12 @@
         "api": "http://10.171.112.160:19281/_send/_mail"
     },
     "nextNode": [
+        {
+            "addr": "127.0.0.1",
+            "port": 1783,
+            "stype": "bidding",
+            "memo": "创建招标数据索引new"
+        }
     ],
     "threads": 1,
     "isMerger": true,

+ 2 - 2
udpfilterdup/src/datamap.go

@@ -457,8 +457,8 @@ func (d *datamap) update(t int64) {
 	if TimingTask {
 		d.keymap = d.GetLatelyFiveDay(t)
 	}else {
-		d.keymap = d.GetLatelyFiveDay(t)//测试数据采用
-		//d.keymap = d.GetLatelyFiveDayDouble(t)
+		//d.keymap = d.GetLatelyFiveDay(t)//测试数据采用
+		d.keymap = d.GetLatelyFiveDayDouble(t)
 	}
 	m := map[string]bool{}
 	for _, v := range d.keymap {

+ 16 - 26
udpfilterdup/src/main.go

@@ -203,7 +203,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 	defer mgo.DestoryMongoConn(sess)
 	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
 	updateExtract := [][]map[string]interface{}{}
-	ids:=[]string{}
 	pool := make(chan bool, threadNum)
 	wg := &sync.WaitGroup{}
 	n, repeateN := 0, 0
@@ -294,35 +293,27 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				if isMerger && !strings.Contains(reason,"低质量"){
 					newData, update_map ,isReplace := mergeDataFields(source, info)
 					if isReplace {//替换-数据池
-						fmt.Println("替换更新:",source.id)
+						fmt.Println("合并更新的id:",source.id)
 						//数据池 - 替换
 						DM.replacePoolData(newData)
 						//mongo更新 - 具体字段 - merge
 						mgo.UpdateById(extract,source.id,update_map)
-						//发udp  ids:更新索引
-						if len(ids)>=9 {
-							fmt.Println("需要更新的ids:",ids)
-							ids=append(ids,source.id)
-							for _, to := range nextNode {
-								key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
-								by, _ := json.Marshal(map[string]interface{}{
-									"gtid":  source.id,
-									"lteid": source.id,
-									"stype": util.ObjToString(to["stype"]),
-									"key":   key,
-									"ids":   strings.Join(ids, ","),
-								})
-								addr := &net.UDPAddr{
-									IP:   net.ParseIP(to["addr"].(string)),
-									Port: util.IntAll(to["port"]),
-								}
-								node := &udpNode{by, addr, time.Now().Unix(), 0}
-								udptaskmap.Store(key, node)
-								udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+						//发udp  更新索引
+						for _, to := range nextNode {
+							key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
+							by, _ := json.Marshal(map[string]interface{}{
+								"gtid":  source.id,
+								"lteid": source.id,
+								"stype": "biddingall",
+								"key":   key,
+							})
+							addr := &net.UDPAddr{
+								IP:   net.ParseIP(to["addr"].(string)),
+								Port: util.IntAll(to["port"]),
 							}
-							ids = []string{}
-						}else {
-							ids=append(ids,source.id)
+							node := &udpNode{by, addr, time.Now().Unix(), 0}
+							udptaskmap.Store(key, node)
+							udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
 						}
 					}
 				}
@@ -352,7 +343,6 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				"lteid": eid,
 				"stype": util.ObjToString(to["stype"]),
 				"key":   key,
-				"ids":   strings.Join(ids, ","),
 			})
 			addr := &net.UDPAddr{
 				IP:   net.ParseIP(to["addr"].(string)),