apple 5 år sedan
förälder
incheckning
5f5670d18e
3 ändrade filer med 236 tillägg och 19 borttagningar
  1. 4 4
      udpfilterdup/src/config.json
  2. 16 7
      udpfilterdup/src/datamap.go
  3. 216 8
      udpfilterdup/src/main.go

+ 4 - 4
udpfilterdup/src/config.json

@@ -1,12 +1,12 @@
 {
     "udpport": ":17859",
-    "dupdays": 5,
+    "dupdays": 7,
     "mongodb": {
         "addr": "192.168.3.207:27092",
         "pool": 5,
         "db": "extract_kf",
-        "extract": "a_testbidding",
-        "extract_back": "a_testbidding",
+        "extract": "zk_zk_test",
+        "extract_back": "zk_zk_test",
         "site": {
             "dbname": "extract_kf",
             "coll": "site"
@@ -19,7 +19,7 @@
     "nextNode": [
     ],
     "threads": 1,
-    "isMerger": true,
+    "isMerger": false,
     "lowHeavy":true,
     "timingTask":false,
     "timingSpanDay": 3,

+ 16 - 7
udpfilterdup/src/datamap.go

@@ -4,6 +4,7 @@ import (
 	"fmt"
 	"log"
 	qutil "qfw/util"
+	"reflect"
 	"regexp"
 	"strings"
 	"sync"
@@ -76,10 +77,14 @@ func TimedTaskDatamap(days int,lasttime int64) *datamap {
 			qutil.IntAll(tmp["dataging"]) == 1 {
 
 		} else {
+			if fmt.Sprint(reflect.TypeOf(tmp["publishtime"]))=="string" {
+				continue
+			}
 			pt := tmp["publishtime"]
 			pt_time := qutil.Int64All(pt)
-			if pt_time <= 0 {
-				break
+
+			if pt_time > time.Now().Unix() {
+				continue
 			}
 			if qutil.Float64All(lasttime-pt_time) < datelimit {
 				continuSum++
@@ -142,14 +147,18 @@ func NewDatamap(days int, lastid string) *datamap {
 		if qutil.IntAll(tmp["repeat"]) == 1 || qutil.IntAll(tmp["repeat"]) == -1{
 
 		} else {
-			pt := tmp["publishtime"]
+			if fmt.Sprint(reflect.TypeOf(tmp["publishtime"]))=="string" {
+				continue
+			}
+			pt:= tmp["publishtime"]
 			pt_time := qutil.Int64All(pt)
-			if pt_time <= 0 {
-				break
+			if pt_time > time.Now().Unix() {
+				continue
 			}
 			if now1 == 0 {
 				now1 = pt_time
 			}
+
 			if qutil.Float64All(now1-pt_time) < datelimit {
 				continuSum++
 				info := NewInfo(tmp)
@@ -457,8 +466,8 @@ func (d *datamap) update(t int64) {
 	if TimingTask {
 		d.keymap = d.GetLatelyFiveDay(t)
 	}else {
-		//d.keymap = d.GetLatelyFiveDay(t)//测试数据采用
-		d.keymap = d.GetLatelyFiveDayDouble(t)
+		d.keymap = d.GetLatelyFiveDay(t)//测试全量数据采用
+		//d.keymap = d.GetLatelyFiveDayDouble(t)
 	}
 	m := map[string]bool{}
 	for _, v := range d.keymap {

+ 216 - 8
udpfilterdup/src/main.go

@@ -66,6 +66,7 @@ func init() {
 		Size:        util.IntAllDef(mconf["pool"], 10),
 	}
 	mgo.InitPool()
+
 	extract = mconf["extract"].(string)
 	extract_back = mconf["extract_back"].(string)
 
@@ -117,8 +118,6 @@ func main() {
 	time.Sleep(99999 * time.Hour)
 }
 
-
-
 //测试组人员使用
 func mainT() {
 	if TimingTask {
@@ -127,8 +126,8 @@ func mainT() {
 		time.Sleep(99999 * time.Hour)
 	} else {
 		//IdType = true  //打开id字符串模式
-		sid = "5f0141f6801f744d046c6691"
-		eid = "5f017266801f744d046c7a17"
+		sid = "5f15bf800000000000000000"
+		eid = "5f1efa000000000000000000"
 		log.Println("正常判重测试开始")
 		log.Println(sid, "---", eid)
 		mapinfo := map[string]interface{}{}
@@ -139,7 +138,7 @@ func mainT() {
 		mapinfo["gtid"] = sid
 		mapinfo["lteid"] = eid
 		mapinfo["stop"] = "true"
-		task([]byte{}, mapinfo)
+		taskRepair([]byte{}, mapinfo)
 		time.Sleep(99999 * time.Hour)
 	}
 }
@@ -178,6 +177,213 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 	}
 }
 
+func taskRepair(data []byte, mapInfo map[string]interface{}) {
+	log.Println("开始修复数据判重")
+	defer util.Catch()
+	//区间id
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gt":  StringTOBsonId(mapInfo["gtid"].(string)),
+			"$lte": StringTOBsonId(mapInfo["lteid"].(string)),
+		},
+	}
+	if IdType {
+		q = map[string]interface{}{
+			"_id": map[string]interface{}{
+				"$gt":  mapInfo["gtid"].(string),
+				"$lte": mapInfo["lteid"].(string),
+			},
+		}
+	}
+
+
+	log.Println(mgo.DbName, extract, q)
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	updateExtract := [][]map[string]interface{}{}
+	updateSave := []map[string]interface{}{}
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	n, repeateN := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if n%10000 == 0 {
+			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
+		}
+		source := util.ObjToMap(tmp["jsondata"]) //前置-jsondata判重
+		if util.IntAll((*source)["sourcewebsite"]) == 1 {
+			repeateN++
+			updateExtract = append(updateExtract, []map[string]interface{}{
+				map[string]interface{}{
+					"_id": tmp["_id"],
+				},
+				map[string]interface{}{
+					"$set": map[string]interface{}{
+						"repeat": 1,
+						"dataging":0,
+						"repeat_reason": "sourcewebsite为1,重复",
+					},
+				},
+			})
+			if len(updateExtract) >= 200 {
+				mgo.UpSertBulk(extract, updateExtract...)
+				updateExtract = [][]map[string]interface{}{}
+			}
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+		if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1||
+			util.IntAll(tmp["dataging"]) == 1 {
+			if util.IntAll(tmp["repeat"]) == 1 {
+				repeateN++
+			}
+			tmp = make(map[string]interface{})
+			continue
+		}
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			info := NewInfo(tmp)
+			if !LowHeavy { //是否进行低质量数据判重
+				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"repeat": -1, //无效数据标签
+							},
+						},
+					})
+					if len(updateExtract) >= 200 {
+						mgo.UpSertBulk(extract, updateExtract...)
+						updateExtract = [][]map[string]interface{}{}
+					}
+
+					//存新表
+					log.Println("修复数据:",info.id)
+					updateSave = append(updateSave, map[string]interface{}{
+						"id":info.id,
+					})
+					if len(updateSave) >= 100 {
+						mgo.SaveBulk("repair_repeat_0728", updateSave...)
+						updateSave = []map[string]interface{}{}
+					}
+					return
+				}
+			}
+			//正常判重
+			b, source, reason := DM.check(info)
+			if b { //有重复,生成更新语句,更新抽取和更新招标
+				repeateN++
+				var updateID = map[string]interface{}{} //记录更新判重的
+				updateID["_id"] = StringTOBsonId(info.id)
+				if IdType {
+					updateID["_id"] = info.id
+				}
+
+				updateExtract = append(updateExtract, []map[string]interface{}{//重复数据打标签
+					updateID,
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat":        1,
+							"repeat_reason": reason,
+							"repeat_id":     source.id,
+						},
+					},
+				})
+
+
+				//存新表
+				log.Println("修复-数据:",info.id)
+				updateSave = append(updateSave, map[string]interface{}{
+					"id":info.id,
+				})
+				if len(updateSave) >= 100 {
+					mgo.SaveBulk("repair_repeat_0728", updateSave...)
+					updateSave = []map[string]interface{}{}
+				}
+
+
+				//是否合并-低质量数据不合并
+				if isMerger && !strings.Contains(reason,"低质量"){
+					newData, update_map ,isReplace := mergeDataFields(source, info)
+					if isReplace {//替换-数据池
+						fmt.Println("合并更新的id:",source.id)
+						//数据池 - 替换
+						DM.replacePoolData(newData)
+						//mongo更新 - 具体字段 - merge
+						mgo.UpdateById(extract,source.id,update_map)
+						//发udp  更新索引
+						//for _, to := range nextNode {
+						//	key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"])
+						//	by, _ := json.Marshal(map[string]interface{}{
+						//		"gtid":  source.id,
+						//		"lteid": source.id,
+						//		"stype": "biddingall",
+						//		"key":   key,
+						//	})
+						//	addr := &net.UDPAddr{
+						//		IP:   net.ParseIP(to["addr"].(string)),
+						//		Port: util.IntAll(to["port"]),
+						//	}
+						//	node := &udpNode{by, addr, time.Now().Unix(), 0}
+						//	udptaskmap.Store(key, node)
+						//	udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+						//}
+					}
+				}
+			}
+		}(tmp)
+		if len(updateExtract) >= 200 {
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	if len(updateExtract) > 0 {
+		mgo.UpSertBulk(extract, updateExtract...)
+	}
+	if len(updateSave) >= 0 {
+		mgo.SaveBulk("repair_repeat_0728", updateSave...)
+	}
+
+	log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"])
+
+	//任务完成,开始发送广播通知下面节点
+	if n > repeateN && mapInfo["stop"] == nil {
+		log.Println("判重任务完成发送udp")
+		for _, to := range nextNode {
+			sid, _ := mapInfo["gtid"].(string)
+			eid, _ := mapInfo["lteid"].(string)
+			key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  sid,
+				"lteid": eid,
+				"stype": util.ObjToString(to["stype"]),
+				"key":   key,
+			})
+			addr := &net.UDPAddr{
+				IP:   net.ParseIP(to["addr"].(string)),
+				Port: util.IntAll(to["port"]),
+			}
+			node := &udpNode{by, addr, time.Now().Unix(), 0}
+			udptaskmap.Store(key, node)
+			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+		}
+	}
+}
+
+
+
+
 //开始判重程序
 func task(data []byte, mapInfo map[string]interface{}) {
 	log.Println("开始数据判重")
@@ -197,6 +403,8 @@ func task(data []byte, mapInfo map[string]interface{}) {
 			},
 		}
 	}
+
+
 	log.Println(mgo.DbName, extract, q)
 	sess := mgo.GetMgoConn()
 	defer mgo.DestoryMongoConn(sess)
@@ -365,6 +573,7 @@ func timedTaskDay() {
 	c.Start()
 }
 func timedTaskOnce() {
+
 	defer util.Catch()
 	log.Println("开始一次迁移任务")
 	movedata()
@@ -496,7 +705,7 @@ func timedTaskOnce() {
 		log.Println("构建第",k,"组---(数据池)")
 		//当前组的第一个发布时间
 		first_pt :=util.Int64All(v[0]["publishtime"])
-		DM = TimedTaskDatamap(dupdays, first_pt)
+		curTM := TimedTaskDatamap(dupdays, first_pt)
 		log.Println("开始遍历判重第",k,"组  共计数量:",len(v))
 		n = n+len(v)
 		log.Println("统计目前总数量:",n,"重复数量:",repeateN)
@@ -523,7 +732,7 @@ func timedTaskOnce() {
 					continue
 				}
 			}
-			b, source, reason := DM.check(info)
+			b, source, reason := curTM.check(info)
 			if b { //有重复,生成更新语句,更新抽取和更新招标
 				log.Println("判重结果", b, reason,"目标id",info.id)
 				repeateN++
@@ -564,7 +773,6 @@ func timedTaskOnce() {
 			mgo.UpSertBulk(extract, updateExtract...)
 			updateExtract = [][]map[string]interface{}{}
 		}
-
 	}