apple 5 лет назад
Родитель
Сommit
da2c483229

+ 3 - 3
udpfilterdup/src/config.json

@@ -5,8 +5,8 @@
         "addr": "192.168.3.207:27092",
         "pool": 5,
         "db": "extract_kf",
-        "extract": "zk_zk_newTest",
-        "extract_back": "zk_zk_newTest",
+        "extract": "zk_test",
+        "extract_back": "zk_test",
         "site": {
             "dbname": "zhaolongyue",
             "coll": "site"
@@ -19,7 +19,7 @@
     "nextNode": [
     ],
     "threads": 1,
-    "isMerger": false,
+    "isMerger": true,
     "lowHeavy":true,
     "timingTask":false,
     "timingSpanDay": 3,

+ 120 - 2
udpfilterdup/src/dataMethodMerge.go

@@ -2,8 +2,126 @@ package main
 
 import "qfw/util"
 
-//合并字段-并更新merge字段的值
-func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {
+
+func mergeDataFields(source *Info, info *Info) (*Info,map[string]interface{} ,bool) {
+	update_map := map[string]interface{}{
+		"$set": map[string]interface{}{},
+	}
+	mergeMap :=source.mergemap
+	isReplace:=false
+	//项目名称
+	if source.projectname == "" && info.projectname != "" {
+		mergeMap["projectname"] = map[string]interface{}{
+			"projectname":info.projectname,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["projectname"] = info.projectname
+		source.projectname = info.projectname
+		isReplace = true
+	}
+
+	//项目编号
+	if source.projectcode == "" && info.projectcode != "" {
+		mergeMap["projectcode"] = map[string]interface{}{
+			"projectcode":info.projectcode,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["projectcode"] = info.projectcode
+		source.projectcode = info.projectcode
+		isReplace = true
+	}
+
+	//采购单位
+	if source.buyer == "" && info.buyer != "" {
+		mergeMap["buyer"] = map[string]interface{}{
+			"buyer":info.buyer,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["buyer"] = info.buyer
+		source.buyer = info.buyer
+		isReplace = true
+	}
+
+	//预算
+	if source.budget == 0 && info.budget != 0 {
+		mergeMap["budget"] = map[string]interface{}{
+			"budget":info.budget,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["budget"] = info.budget
+		source.budget = info.budget
+		isReplace = true
+	}
+
+	//中标单位
+	if source.winner == "" && info.winner != "" {
+		mergeMap["winner"] = map[string]interface{}{
+			"winner":info.winner,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["winner"] = info.winner
+		source.winner = info.winner
+		isReplace = true
+	}
+
+	//中标金额
+	if source.bidamount == 0 && info.bidamount != 0 {
+		mergeMap["bidamount"] = map[string]interface{}{
+			"bidamount":info.bidamount,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["bidamount"] = info.bidamount
+		source.bidamount = info.bidamount
+		isReplace = true
+	}
+
+	//开标时间
+	if source.bidopentime == 0 && info.bidopentime != 0 {
+		mergeMap["bidopentime"] = map[string]interface{}{
+			"bidopentime":info.bidopentime,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["bidopentime"] = info.bidopentime
+		source.bidopentime = info.bidopentime
+		isReplace = true
+	}
+
+	//合同编号
+	if source.contractnumber == "" && info.contractnumber != "" {
+		mergeMap["contractnumber"] = map[string]interface{}{
+			"contractnumber":info.contractnumber,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["contractnumber"] = info.contractnumber
+		source.contractnumber = info.contractnumber
+		isReplace = true
+	}
+
+	//代理机构
+	if source.agency == "" && info.agency != "" {
+		mergeMap["agency"] = map[string]interface{}{
+			"agency":info.agency,
+			"id":info.id,
+		}
+		update_map["$set"].(map[string]interface{})["agency"] = info.agency
+		source.agency = info.agency
+		isReplace = true
+	}
+
+	source.mergemap = mergeMap
+	update_map["$set"].(map[string]interface{})["merge"] = mergeMap
+
+	return source,update_map,isReplace
+}
+
+
+
+
+
+
+
+//合并字段-并更新merge字段的值-
+func mergeDataFieldsArr(source *Info, info *Info) (*Info, []int64, bool) {
 
 	merge_recordMap := make(map[string]interface{}, 0)
 	mergeArr := make([]int64, 0)

+ 20 - 2
udpfilterdup/src/datamap.go

@@ -223,7 +223,7 @@ func NewInfo(tmp map[string]interface{}) *Info {
 
 	info.specialWord = FilterRegTitle.MatchString(info.title)
 	info.titleSpecialWord = FilterRegTitle_0.MatchString(info.title) ||FilterRegTitle_1.MatchString(info.title) || FilterRegTitle_2.MatchString(info.title)
-	info.mergemap = *qutil.ObjToMap(tmp["merge_map"])
+	info.mergemap = *qutil.ObjToMap(tmp["merge"])
 	if info.mergemap == nil {
 		info.mergemap = make(map[string]interface{}, 0)
 	}
@@ -505,6 +505,25 @@ func (d *datamap) GetLatelyFiveDayDouble(t int64) []string  {//增量-两倍
 
 
 
+//替换原始数据池
+func (d *datamap) replacePoolData(newData *Info) {
+	d.lock.Lock()
+	ct := newData.publishtime
+	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
+	k := fmt.Sprintf("%s_%s_%s", dkey, newData.subtype, newData.area)
+	data := d.data[k]
+	for k, v := range data {
+		if v.id == newData.id {//替换
+			data[k] = newData
+			break
+		}
+	}
+	d.data[k] = data
+	d.lock.Unlock()
+}
+
+
+
 //替换原始数据池
 func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
 	//删除数据池的老数据
@@ -550,7 +569,6 @@ func (d *datamap) replaceSourceData(newData *Info, oldData *Info) {
 		d.areakeys = areaArr
 	}
 
-
 	d.lock.Unlock()
 }
 

+ 40 - 2
udpfilterdup/src/main.go

@@ -128,8 +128,8 @@ func mainT() {
 		time.Sleep(99999 * time.Hour)
 	} else {
 		//IdType = true  //打开id字符串模式
-		sid = "5ef01220801f744d045f51f1"
-		eid = "5ef61eb3801f744d046402dd"
+		sid = "5da3f2c5a5cb26b9b79847f0"
+		eid = "5da40bdaa5cb26b9b7bea480"
 		log.Println("正常判重测试开始")
 		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
@@ -278,6 +278,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 				if IdType {
 					updateID["_id"] = info.id
 				}
+
 				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
 					updateID,
 					map[string]interface{}{
@@ -288,6 +289,43 @@ func task(data []byte, mapInfo map[string]interface{}) {
 						},
 					},
 				})
+
+				//是否合并-低质量数据不合并
+				if isMerger && !strings.Contains(reason,"低质量"){
+					newData, update_map ,isReplace := mergeDataFields(source, info)
+					if isReplace {//替换-数据池
+						fmt.Println("替换更新:",source.id)
+						//数据池
+						DM.replacePoolData(newData) //替换
+						//mongo更新 - 具体字段
+						mgo.UpdateById(extract,source.id,update_map)
+						//发udp  ids:更新索引
+						if len(ids)>=9 {
+							fmt.Println("需要更新的ids:",ids)
+							ids=append(ids,source.id)
+							for _, to := range nextNode {
+								key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
+								by, _ := json.Marshal(map[string]interface{}{
+									"gtid":  source.id,
+									"lteid": source.id,
+									"stype": util.ObjToString(to["stype"]),
+									"key":   key,
+									"ids":   strings.Join(ids, ","),
+								})
+								addr := &net.UDPAddr{
+									IP:   net.ParseIP(to["addr"].(string)),
+									Port: util.IntAll(to["port"]),
+								}
+								node := &udpNode{by, addr, time.Now().Unix(), 0}
+								udptaskmap.Store(key, node)
+								udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+							}
+							ids = []string{}
+						}else {
+							ids=append(ids,source.id)
+						}
+					}
+				}
 			}
 		}(tmp)
 		if len(updateExtract) >= 200 {