Prechádzať zdrojové kódy

低质量,定时任务判重

apple 5 rokov pred
rodič
commit
e5e83061a9

+ 5 - 4
udpfilterdup/src/config.json

@@ -5,9 +5,9 @@
         "addr": "192.168.3.207:27092",
         "pool": 10,
         "db": "extract_kf",
-        "extract": "a_testbidding_new",
+        "extract": "zk",
         "site": {
-            "dbname": "qfw",
+            "dbname": "extract_kf",
             "coll": "site"
         }
     },
@@ -17,9 +17,10 @@
     },
     "nextNode": [],
     "isMerger": false,
-    "threads": 5,
+    "threads": 1,
     "isSort":false,
-    "lowHeavy":true,
+    "lowHeavy":false,
+    "timingTask":false,
     "specialwords": "(重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研)",
     "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?",
     "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批)",

+ 22 - 5
udpfilterdup/src/datamap.go

@@ -63,6 +63,12 @@ type historymap struct {
 	keys   map[string]bool
 }
 
+func TimedTaskDatamap(days int, lastid string) *datamap {
+	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
+	return dm
+}
+
+
 func NewDatamap(days int, lastid string) *datamap {
 	datelimit = qutil.Float64All(days * 86400)
 	dm := &datamap{sync.Mutex{}, days, map[string][]*Info{}, []string{}, map[string]bool{}}
@@ -84,7 +90,7 @@ func NewDatamap(days int, lastid string) *datamap {
 			continuSum++
 		} else {
 			pt := tmp["comeintime"]
-			if Is_Sort {
+			if Is_Sort||TimingTask {
 				pt = tmp["publishtime"]
 			}
 			pt_time := qutil.Int64All(pt)
@@ -95,6 +101,7 @@ func NewDatamap(days int, lastid string) *datamap {
 				now1 = pt_time
 			}
 			if qutil.Float64All(now1-pt_time) < datelimit {
+				log.Println(tmp)
 				info := NewInfo(tmp)
 				dkey := qutil.FormatDateWithObj(&pt, qutil.Date_yyyyMMdd)
 				k := fmt.Sprintf("%s_%s_%s", dkey, info.subtype, info.area)
@@ -132,6 +139,11 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 		true)).Sort("-_id").Iter()
 	m, n := 0, 0
 	for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); {
+
+		if qutil.IntAll(tmp_start["repeat"]) == 1||qutil.IntAll(tmp_start["repeat"]) == -1 {
+			continue
+		}
+
 		pt_s := tmp_start["comeintime"]
 		if Is_Sort {
 			pt_s = tmp_start["publishtime"]
@@ -166,6 +178,11 @@ func NewHistorymap(startid string, lastid string, startTime int64, lastTime int6
 		true)).Sort("_id").Iter()
 
 	for tmp_last := make(map[string]interface{}); it_last.Next(&tmp_last); {
+
+		if qutil.IntAll(tmp_last["repeat"]) == 1||qutil.IntAll(tmp_last["repeat"]) == -1 {
+			continue
+		}
+
 		pt_l := tmp_last["comeintime"]
 		if Is_Sort {
 			pt_l = tmp_last["publishtime"]
@@ -370,7 +387,7 @@ L:
 	//往预存数据 d 添加
 	if !b {
 		ct := info.comeintime
-		if Is_Sort {
+		if Is_Sort ||TimingTask{
 			ct = info.publishtime
 		}
 		dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
@@ -562,7 +579,7 @@ L:
 //替换原始数据池
 func (d *datamap) replaceSourceData(replaceData *Info, replaceId string) {
 	ct := replaceData.comeintime
-	if Is_Sort {
+	if Is_Sort||TimingTask {
 		ct = replaceData.publishtime
 	}
 	dkey := qutil.FormatDateByInt64(&ct, qutil.Date_yyyyMMdd)
@@ -689,7 +706,7 @@ func fastLowQualityHeavy(v *Info, info *Info, reason string) (bool, string) {
 		}else if isValue==1 {
 			isMeet := false
 			if isMeet, reason = judgeLowQualityData(v, info, reason); isMeet {
-				log.Println("符合低质量条件条件1",info.id,"--",v.id)
+				//log.Println("符合低质量条件条件1",info.id,"--",v.id)
 				reason = reason + "---有且一个要素组合"
 				return true, reason
 			}
@@ -988,7 +1005,7 @@ func tenderRepeat_B(v *Info, info *Info, reason string) (bool, string) {
 		if n == 2 && m == 2 {
 			return false, reason
 		} else {
-			reason = reason + "满足招标B,选二,"
+			reason = reason + "满足招标B,选二,"
 			return true, reason
 		}
 	}

+ 268 - 7
udpfilterdup/src/main.go

@@ -8,6 +8,8 @@ import (
 	"encoding/json"
 	"flag"
 	"fmt"
+	"github.com/cron"
+	"gopkg.in/mgo.v2/bson"
 	"log"
 	mu "mfw/util"
 	"net"
@@ -28,6 +30,7 @@ var (
 	dupdays   = 5                      //初始化判重范围
 	DM        *datamap                 //
 	HM        *historymap              //判重数据
+
 	lastid    = ""
 
 	//正则筛选相关
@@ -41,7 +44,8 @@ var (
 	threadNum        int                               //线程数量
 	SiteMap          map[string]map[string]interface{} //站点map
 	LowHeavy		 bool							   //低质量数据判重
-	sid, eid string                            //测试人员判重使用
+	TimingTask		 bool							   //是否定时任务
+	sid, eid string                          		   //测试人员判重使用
 )
 
 func init() {
@@ -71,6 +75,7 @@ func init() {
 	Is_Sort = Sysconfig["isSort"].(bool)
 	threadNum = util.IntAllDef(Sysconfig["threads"], 1)
 	LowHeavy =  Sysconfig["lowHeavy"].(bool)
+	TimingTask =  Sysconfig["timingTask"].(bool)
 	//站点配置
 	site := mconf["site"].(map[string]interface{})
 	SiteMap = make(map[string]map[string]interface{}, 0)
@@ -95,8 +100,17 @@ func main() {
 	go checkMapJob()
 	updport := Sysconfig["udpport"].(string)
 	udpclient = mu.UdpClient{Local: updport, BufSize: 1024}
-	udpclient.Listen(processUdpMsg)
-	log.Println("Udp服务监听", updport)
+	if TimingTask {
+		go timedTaskDay()
+	}else {
+		udpclient.Listen(processUdpMsg)
+		log.Println("Udp服务监听", updport)
+	}
+
+
+
+
+
 	time.Sleep(99999 * time.Hour)
 }
 
@@ -114,7 +128,6 @@ func mainT() {
 	eid = "5db2735ba5cb26b9b7c99c6f"
 
 
-
 	mapinfo := map[string]interface{}{}
 	if sid == "" || eid == "" {
 		log.Println("sid,eid参数不能为空")
@@ -139,7 +152,7 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		} else if mapInfo != nil {
 			taskType := util.ObjToString(mapInfo["stype"])
 			if taskType == "historyTask" {
-				//更新流程
+				//历史更新流程
 				go historyTask(data, mapInfo)
 			} else if taskType == "normalTask" {
 				//判重流程
@@ -163,7 +176,6 @@ func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) {
 		}
 	}
 }
-
 //开始判重程序
 func task(data []byte, mapInfo map[string]interface{}) {
 	log.Println("开始数据判重")
@@ -196,7 +208,7 @@ func task(data []byte, mapInfo map[string]interface{}) {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
-		if util.IntAll(tmp["repeat"]) == 1 {
+		if util.IntAll(tmp["repeat"]) == 1||util.IntAll(tmp["repeat"]) == -1 {
 			tmp = make(map[string]interface{})
 			repeateN++
 			continue
@@ -436,6 +448,11 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 		if n%10000 == 0 {
 			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
 		}
+		if util.IntAll(tmp["dataging"]) == 1 {
+			tmp = make(map[string]interface{})
+			continue
+		}
+
 		pool <- true
 		wg.Add(1)
 		go func(tmp map[string]interface{}) {
@@ -616,6 +633,250 @@ func historyTask(data []byte, mapInfo map[string]interface{}) {
 	}
 }
 
+
+//定时任务
+func timedTaskDay()  {
+	c := cron.New()
+	c.AddFunc("0 0 0 * * ?", func() { timedTaskOnce() }) //每天凌晨执行一次
+	c.Start()
+	timedTaskOnce()
+}
+func timedTaskOnce()  {
+	log.Println("开始一次定时任务")
+	now := time.Now()
+	preTime:=time.Date(now.Year(),now.Month(),now.Day()+100,0,0,0,0,time.Local)
+	curTime:=time.Date(now.Year(),now.Month(),now.Day()+111,0,0,0,0,time.Local)
+	task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime))
+	task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime))
+	lastid :=task_sid
+	log.Println(task_sid,task_eid)
+
+	//ObjectId("5da3f31aa5cb26b9b798d3aa")
+	//ObjectId("5da418c4a5cb26b9b7e3e9a6")
+	//task_sid = "5da3f31aa5cb26b9b798d3aa"
+	//task_eid = "5da418c4a5cb26b9b7e3e9a6"
+
+	defer util.Catch()
+	//区间id
+	q := map[string]interface{}{
+		"_id": map[string]interface{}{
+			"$gte":  StringTOBsonId(task_sid),
+			"$lte": StringTOBsonId(task_eid),
+		},
+	}
+	sess_start := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess_start)
+	it_start := sess_start.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	startNum := 0
+	for tmp_start := make(map[string]interface{}); it_start.Next(&tmp_start); startNum++ {
+		if startNum%10000 == 0 {
+			log.Println("正序遍历:", startNum)
+		}
+		if util.IntAll(tmp_start["dataging"])==1 {//取起始id
+			lastid = BsonTOStringId(tmp_start["_id"])
+			break
+		}
+	}
+
+
+	DM = NewDatamap(dupdays,lastid)
+	log.Println("本地数据加载完成,定时任务数据判重开始")
+	sess := mgo.GetMgoConn()
+	defer mgo.DestoryMongoConn(sess)
+	it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
+	updateExtract := [][]map[string]interface{}{}
+	log.Println("线程数:", threadNum)
+	pool := make(chan bool, threadNum)
+	wg := &sync.WaitGroup{}
+	n, repeateN := 0, 0
+	for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
+		if n%10000 == 0 {
+			log.Println("current:", n, tmp["_id"], "repeateN:", repeateN)
+		}
+		if util.IntAll(tmp["repeat"]) == 1||util.IntAll(tmp["repeat"]) == -1{
+			tmp = make(map[string]interface{})
+			continue
+		}
+		if util.IntAll(tmp["dataging"])!=1 {
+			tmp = make(map[string]interface{})
+			continue
+		}
+
+		pool <- true
+		wg.Add(1)
+		go func(tmp map[string]interface{}) {
+			defer func() {
+				<-pool
+				wg.Done()
+			}()
+			info := NewInfo(tmp)
+			if !LowHeavy {	//是否进行低质量数据判重
+				if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) {
+					updateExtract = append(updateExtract, []map[string]interface{}{
+						map[string]interface{}{
+							"_id": tmp["_id"],
+						},
+						map[string]interface{}{
+							"$set": map[string]interface{}{
+								"repeat": -1,//无效数据标签
+								"dataging": 0,
+							},
+						},
+					})
+					if len(updateExtract) > 500 {
+						mgo.UpSertBulk(extract, updateExtract...)
+						updateExtract = [][]map[string]interface{}{}
+					}
+					return
+				}
+			}
+
+			b, source, reason := DM.check(info)
+			if b { //有重复,生成更新语句,更新抽取和更新招标
+				repeateN++
+				var is_replace = false
+				var mergeArr = []int64{}                    //更改合并数组记录
+				var newData = &Info{}                       //更换新的数据池数据
+				var repeat_idMap = map[string]interface{}{} //记录判重的
+				var merge_idMap = map[string]interface{}{}  //记录合并的
+				repeat_idMap["_id"] = StringTOBsonId(info.id)
+				merge_idMap["_id"] = StringTOBsonId(source.id)
+				repeat_id := source.id//初始化一个数据
+
+				if isMerger {//合并相关
+					basic_bool := basicDataScore(source, info)
+					if basic_bool {
+						//已原始数据为标准 - 对比数据打判重标签-
+						newData, mergeArr, is_replace = mergeDataFields(source, info)
+						DM.replaceSourceData(newData, source.id) //替换
+						//对比数据打重复标签的id,原始数据id的记录
+						repeat_idMap["_id"] = StringTOBsonId(info.id)
+						merge_idMap["_id"] = StringTOBsonId(source.id)
+						repeat_id = source.id
+					} else {
+						//已对比数据为标准 ,数据池的数据打判重标签
+						newData, mergeArr, is_replace = mergeDataFields(info, source)
+						DM.replaceSourceData(newData, source.id) //替换
+						//原始数据打重复标签的id,   对比数据id的记录
+						repeat_idMap["_id"] = StringTOBsonId(source.id)
+						merge_idMap["_id"] = StringTOBsonId(info.id)
+						repeat_id = info.id
+					}
+
+					merge_map := make(map[string]interface{}, 0)
+					if is_replace { //有过合并-更新数据
+						merge_map = map[string]interface{}{
+							"$set": map[string]interface{}{
+								"merge": newData.mergemap,
+								"dataging": 0,
+							},
+						}
+						//更新合并后的数据
+						for _, value := range mergeArr {
+							if value == 0 {
+								merge_map["$set"].(map[string]interface{})["area"] = newData.area
+								merge_map["$set"].(map[string]interface{})["city"] = newData.city
+							} else if value == 1 {
+								merge_map["$set"].(map[string]interface{})["area"] = newData.area
+								merge_map["$set"].(map[string]interface{})["city"] = newData.city
+							} else if value == 2 {
+								merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname
+							} else if value == 3 {
+								merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode
+							} else if value == 4 {
+								merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer
+							} else if value == 5 {
+								merge_map["$set"].(map[string]interface{})["budget"] = newData.budget
+							} else if value == 6 {
+								merge_map["$set"].(map[string]interface{})["winner"] = newData.winner
+							} else if value == 7 {
+								merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount
+							} else if value == 8 {
+								merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime
+							} else if value == 9 {
+								merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber
+							} else if value == 10 {
+								merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime
+							} else if value == 11 {
+								merge_map["$set"].(map[string]interface{})["agency"] = newData.agency
+							} else {
+							}
+						}
+						//模板数据更新
+						updateExtract = append(updateExtract, []map[string]interface{}{
+							merge_idMap,
+							merge_map,
+						})
+					}
+				}else { //高质量数据
+					basic_bool := basicDataScore(source, info)
+					if !basic_bool {
+						DM.replaceSourceData(info, source.id) //替换
+						repeat_idMap["_id"] = StringTOBsonId(source.id)
+						repeat_id = info.id
+					}
+				}
+
+				//重复数据打标签
+				updateExtract = append(updateExtract, []map[string]interface{}{
+					repeat_idMap,
+					map[string]interface{}{
+						"$set": map[string]interface{}{
+							"repeat":        1,
+							"repeat_reason": reason,
+							"repeat_id":     repeat_id,
+							"dataging": 0,
+						},
+					},
+				})
+
+			}
+		}(tmp)
+		if len(updateExtract) > 500 {
+			mgo.UpSertBulk(extract, updateExtract...)
+			updateExtract = [][]map[string]interface{}{}
+		}
+		tmp = make(map[string]interface{})
+	}
+	wg.Wait()
+	if len(updateExtract) > 0 {
+		mgo.UpSertBulk(extract, updateExtract...)
+	}
+	log.Println("this timeTask over.", n, "repeateN:", repeateN)
+
+
+	//任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
+	if n > repeateN {
+		for _, to := range nextNode {
+			next_sid := util.BsonIdToSId(task_sid)
+			next_eid := util.BsonIdToSId(task_eid)
+			key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
+			by, _ := json.Marshal(map[string]interface{}{
+				"gtid":  next_sid,
+				"lteid": next_eid,
+				"stype": util.ObjToString(to["stype"]),
+				"key":   key,
+			})
+			addr := &net.UDPAddr{
+				IP:   net.ParseIP(to["addr"].(string)),
+				Port: util.IntAll(to["port"]),
+			}
+			node := &udpNode{by, addr, time.Now().Unix(), 0}
+			udptaskmap.Store(key, node)
+			udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
+		}
+	}
+}
+
+
+
+
+
+
+
+
+
+
 //合并字段-并更新merge字段的值
 func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) {