package main /** 招标信息判重 **/ import ( "encoding/json" "fmt" "gopkg.in/mgo.v2/bson" "log" mu "mfw/util" "net" "qfw/util" "qfw/util/mongodb" "regexp" "sync" "time" ) var ( Sysconfig map[string]interface{} //配置文件 mconf map[string]interface{} //mongodb配置信息 mgo *mongodb.MongodbSim //mongodb操作对象 siteMgo *mongodb.MongodbSim //mgoTest *mongodb.MongodbSim //mongodb操作对象 extract string extract_copy string bidding string udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 dupdays = 5 //初始化判重范围 DM *datamap // HM *historymap //判重数据 lastid = "5d767728a5cb26b9b7748868" //ObjectId("5d767728a5cb26b9b7748868") //正则筛选相关 FilterRegTitle = regexp.MustCompile("^_$") FilterRegTitle_1 = regexp.MustCompile("^_$") FilterRegTitle_2 = regexp.MustCompile("^_$") SiteMap map[string]interface{} //站点map ) func init() { //flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据 //flag.Parse() //172.17.145.163:27080 util.ReadConfig(&Sysconfig) nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) mconf = Sysconfig["mongodb"].(map[string]interface{}) mgo = &mongodb.MongodbSim{ MongodbAddr: mconf["addr"].(string), DbName: mconf["db"].(string), Size: util.IntAllDef(mconf["pool"], 10), } extract = mconf["extract"].(string) extract_copy = mconf["extract_copy"].(string) //bidding = mconf["bidding"].(string) mgo.InitPool() //测试需临时注释 dupdays = util.IntAllDef(Sysconfig["dupdays"], 3) //加载数据 DM = NewDatamap(dupdays, lastid) FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"])) FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"])) FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"])) //站点相关数据库 mongodb.InitMongodbPool(5, "192.168.3.207:27082", "") siteMgo = &mongodb.MongodbSim{ MongodbAddr: "192.168.3.207:27082", Size: 5, DbName: "zhaolongyue", } siteMgo.InitPool() SiteMap = make(map[string]interface{},0) start := int(time.Now().Unix()) //站点配置 sess_site := siteMgo.GetMgoConn() defer sess_site.Close() res_site := sess_site.DB("zhaolongyue").C("site").Find(nil).Sort("_id").Iter() for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); { data_map := map[string]string{ "area":util.ObjToString(site_dict["area"]), "city":util.ObjToString(site_dict["city"]), "district":util.ObjToString(site_dict["district"]), } SiteMap[site_dict["site"].(string)]= data_map } fmt.Printf("用时:%d秒,%d个",int(time.Now().Unix())-start,len(SiteMap)) } //新增一个方法 判断 func mainTest() { //log.Println("1") //代码copy数据 //sessTest :=mgoTest.GetMgoConn() //defer sessTest.Close() // //sess := mgo.GetMgoConn() //defer sess.Close() // ////var arr []map[string]interface{} // //res_test := sessTest.DB("qfw").C("bidding").Find(mongodb.ObjToMQ(`{"comeintime":{"$gte": 1571025600, "$lte": 1571976000}}`, true)).Iter() //res :=sess.DB("extract_kf").C("a_testbidding") //5 // // // // //i:=0 //for dict := make(map[string]interface{}); res_test.Next(&dict); i++{ // // //插入 // if i%2000==0 { // log.Println("当前:",i) // } // res.Insert(dict) // //if len(arr)>=500 { // // arr = make([]map[string]interface{},0) // //}else { // // arr = append(arr,dict) // //} //} sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) res_copy := sess.DB("extract_kf").C(extract_copy).Find(nil).Iter() m1 :=map[string]int{} //老版本 m2 :=map[string]int{} //新版本 i:=0 j:=0 for v1 := make(map[string]interface{}); res_copy.Next(&v1); i++{ if i%2000==0 { log.Println("当前i:",i) } m1[(v1["_id"].(bson.ObjectId).Hex())]= util.IntAll(v1["repeat"]) } sesss := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sesss) res := sesss.DB("extract_kf").C(extract).Find(nil).Iter() for v2 := make(map[string]interface{}); res.Next(&v2); j++{ if j%2000==0 { log.Println("当前j:",j) } m2[(v2["_id"].(bson.ObjectId).Hex())]= util.IntAll(v2["repeat"]) } fmt.Println(len(m1),len(m2)) n1:=0 n2:=0 n3:=0 n4:=0 n5:=0 n6:=0 var arr1 []string var arr2 []string for k,v:=range m1{ if m2[k]==1&&v==0{//0:1 n1++ arr2 = append(arr2,fmt.Sprintf("目标_id:%s",k)) } if m2[k]==0&&v==1{ //1:0 n2++ arr1 = append(arr1,fmt.Sprintf("目标_id:%s",k)) } if m2[k]==0&&v==0{ //0:0 n3++ } if m2[k]==1&&v==1{//1:1 n4++ } if m2[k]==-1&&v==0{ //0:-1 n5++ } if m2[k]==-1&&v==1{//1:-1 n6++ } } //打印 1:0情况 66989; mm:=0 for _,v:=range arr1 { mm++ if mm%222==0 { log.Println(v) } } log.Println("分割线---------------") log.Println("分割线---------------") //打印 0:1情况 8729 nn:=0 for _,v:=range arr2 { nn++ if nn%30==0 { log.Println(v) } } log.Println("V1 0:1---",n1) log.Println("V1 1:0---",n2) log.Println("V1 0:0---",n3) log.Println("V1 1:1---",n4) log.Println("V1 0:-1---",n5) log.Println("V1 1:-1---",n6) } func main() { go checkMapJob() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { fmt.Println("接受的段数据") switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { //更新流程 go historyTask(data,mapInfo) //判重流程 //go task(data, mapInfo) key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) udptaskmap.Delete(ok) } } } //开始判重程序 func task(data []byte, mapInfo map[string]interface{}) { fmt.Println("开始数据判重") defer util.Catch() //区间id sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter() updateExtract := [][]map[string]interface{}{} pool := make(chan bool, 16) wg := &sync.WaitGroup{} mapLock := &sync.Mutex{} n, repeateN := 0, 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { if n%10000 == 0 { log.Println("current:", n, tmp["_id"],"repeateN:",repeateN) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() info := NewInfo(tmp) //是否为无效数据 if invalidData(info.buyer,info.projectname,info.projectcode) { mapLock.Lock() updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": -1, }, }, }) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } mapLock.Unlock() }else { //判重原因 reason tmp["_id"] 对比id id原始id mapLock.Lock() b, source,reason := DM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 repeateN++ var mergeArr = []int64{} //更改合并数组记录 var newData = &Info{} //更换新的数据池数据 var id_map = map[string]interface{}{} repeat_id := "" //合并操作--评功权重打分-合并完替换原始数据池 basic_bool := basicDataScore(source,info) if basic_bool { //已原始数据为标准-对比数据打判重标签 newData,mergeArr= mergeDataFields(source,info) DM.replaceSourceData(newData,source.id) //替换 id_map["_id"]= util.StringTOBsonId(source.id) repeat_id = source.id }else { //已对比数据为标准 ,数据池的数据打判重标签 newData,mergeArr= mergeDataFields(info,source) DM.replaceSourceData(newData,source.id)//替换 id_map["_id"]= util.StringTOBsonId(info.id) repeat_id = info.id } var update_map = map[string]interface{}{ "$set": map[string]interface{}{ "reason":reason, "repeat":"1", "repeatid":repeat_id, }, } //合并记录 if len(newData.mergemap)>0 { update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap } //更新合并后的数据 for _,value :=range mergeArr { if value==1 { update_map["$set"].(map[string]interface{})["area"] = newData.area update_map["$set"].(map[string]interface{})["city"] = newData.city }else if value==2 { update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname }else if value==3 { update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode }else if value==4 { update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer }else if value==5 { update_map["$set"].(map[string]interface{})["budget"] = newData.budget }else if value==6 { update_map["$set"].(map[string]interface{})["winner"] = newData.winner }else if value==7 { update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount }else if value==8 { update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime }else { } } //构建数据库更新用到的 updateExtract = append(updateExtract, []map[string]interface{}{ id_map, update_map, }) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } mapLock.Unlock() } else { mapLock.Unlock() } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() if len(updateExtract) > 0 { mgo.UpdateBulk(extract, updateExtract...) //mgo.UpdateBulk(bidding, updateBidding...) } log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"]) //任务完成,开始发送广播通知下面节点 if n > repeateN && mapInfo["stop"] == nil { for _, to := range nextNode { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } } //支持历史更新 func historyTask(data []byte, mapInfo map[string]interface{}) { fmt.Println("开始取历史时间段") defer util.Catch() sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter() minTime,maxTime:=int64(0),int64(0) for tmp := make(map[string]interface{}); it.Next(&tmp);{ //取出最大最小时间 if minTime==0||maxTime ==0 { minTime = util.Int64All(tmp["comeintime"]) maxTime = util.Int64All(tmp["comeintime"]) }else { t := util.Int64All(tmp["comeintime"]) if tmaxTime&&t!=0 { maxTime = t } } } fmt.Println("最小时间==",minTime,"最大时间==",maxTime) //最小时间== 1568087634 最大时间== 1568103381 HM = NewHistorymap(util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]),minTime,maxTime) //return //开始判重... defer util.Catch() sess_task := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess_task) q_task := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } it_task := sess.DB(mgo.DbName).C(extract).Find(&q_task).Iter() updateExtract := [][]map[string]interface{}{} pool := make(chan bool, 16) wg := &sync.WaitGroup{} mapLock := &sync.Mutex{} n, repeateN := 0, 0 for tmp := make(map[string]interface{}); it_task.Next(&tmp); n++ { if n%10000 == 0 { log.Println("current:", n, tmp["_id"],"repeateN:",repeateN) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() info := NewInfo(tmp) //是否为无效数据 if invalidData(info.buyer,info.projectname,info.projectcode) { mapLock.Lock() updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": -1, }, }, }) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } mapLock.Unlock() }else { mapLock.Lock() b, source,reason := HM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 if reason == "未判重记录" { fmt.Println("未判重记录") //把info的数据判重的标签更换,并新增字段 DM.replaceSourceData(info, info.id) //替换即添加 updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 0, "repeatid": "-1", }, }, }) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } mapLock.Unlock() }else { repeateN++ var mergeArr = []int64{} //更改合并数组记录 var newData = &Info{} //更换新的数据池数据 var id_map = map[string]interface{}{} repeat_id := "" //合并操作--评功权重打分-合并完替换原始数据池 basic_bool := basicDataScore(source,info) if basic_bool { //已原始数据为标准-对比数据打判重标签 newData,mergeArr= mergeDataFields(source,info) DM.replaceSourceData(newData,source.id) //替换 id_map["_id"]= util.StringTOBsonId(source.id) repeat_id = source.id }else { //已对比数据为标准 ,数据池的数据打判重标签 newData,mergeArr= mergeDataFields(info,source) DM.replaceSourceData(newData,source.id)//替换 id_map["_id"]= util.StringTOBsonId(info.id) repeat_id = info.id } var update_map = map[string]interface{}{ "$set": map[string]interface{}{ "reason":reason, "repeat":"1", "repeatid":repeat_id, }, } //合并记录 if len(newData.mergemap)>0 { update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap } //更新合并后的数据 for _,value :=range mergeArr { if value==1 { update_map["$set"].(map[string]interface{})["area"] = newData.area update_map["$set"].(map[string]interface{})["city"] = newData.city }else if value==2 { update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname }else if value==3 { update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode }else if value==4 { update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer }else if value==5 { update_map["$set"].(map[string]interface{})["budget"] = newData.budget }else if value==6 { update_map["$set"].(map[string]interface{})["winner"] = newData.winner }else if value==7 { update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount }else if value==8 { update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime }else { } } //构建数据库更新用到的 updateExtract = append(updateExtract, []map[string]interface{}{ id_map, update_map, }) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } mapLock.Unlock() } }else { mapLock.Unlock() } } }(tmp) tmp = make(map[string]interface{}) } wg.Wait() if len(updateExtract) > 0 { mgo.UpdateBulk(extract, updateExtract...) //mgo.UpdateBulk(bidding, updateBidding...) } log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"]) //任务完成,开始发送广播通知下面节点 if n > repeateN &&mapInfo["stop"] == nil { for _, to := range nextNode { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } } //合并字段 func mergeDataFields(source *Info, info *Info) (*Info,[]int64){ var mergeArr []int64 mergeArr = make([]int64,0) //1、城市 if (source.area==""||source.area=="全国")&&info.area!="全国"&&info.area!=""{ var arrA []string if source.mergemap["area"]==nil { arrA = make([]string, 0) }else { arrA = source.mergemap["area"].([]string) } arrA = append(arrA,source.area) source.mergemap["area"] = arrA var arrC []string if source.mergemap["city"]==nil { arrC = make([]string, 0) }else { arrC = source.mergemap["city"].([]string) } arrC = append(arrC,source.city) source.mergemap["city"] = arrC source.area = info.area source.city = info.city mergeArr = append(mergeArr,1) } //2、项目名称 if source.projectname==""&&info.projectname!=""{ var arr []string if source.mergemap["projectname"]==nil { arr = make([]string, 0) }else { arr = source.mergemap["projectname"].([]string) } arr = append(arr,source.projectname) source.mergemap["projectname"] = arr source.projectname = info.projectname mergeArr = append(mergeArr,2) } //3、项目编号 if source.projectcode==""&&info.projectcode!=""{ var arr []string if source.mergemap["projectcode"]==nil { arr = make([]string, 0) }else { arr = source.mergemap["projectcode"].([]string) } arr = append(arr,source.projectcode) source.mergemap["projectcode"] = arr source.projectcode = info.projectcode mergeArr = append(mergeArr,3) } //4、采购单位 if source.buyer==""&&info.buyer!=""{ var arr []string if source.mergemap["buyer"]==nil { arr = make([]string, 0) }else { arr = source.mergemap["buyer"].([]string) } arr = append(arr,source.buyer) source.mergemap["buyer"] = arr source.buyer = info.buyer mergeArr = append(mergeArr,4) } //5、预算 if source.budget==0&&info.budget!=0{ var arr []float64 if source.mergemap["budget"]==nil { arr = make([]float64, 0) }else { arr = source.mergemap["budget"].([]float64) } arr = append(arr,source.budget) source.mergemap["budget"] = arr source.budget = info.budget mergeArr = append(mergeArr,5) } //6、中标单位 if source.winner==""&&info.winner!=""{ var arr []string if source.mergemap["winner"]==nil { arr = make([]string, 0) }else { arr = source.mergemap["winner"].([]string) } arr = append(arr,source.winner) source.mergemap["winner"] = arr source.winner = info.winner mergeArr = append(mergeArr,6) } //7、中标金额 if source.bidamount==0&&info.bidamount!=0{ var arr []float64 if source.mergemap["bidamount"]==nil { arr = make([]float64, 0) }else { arr = source.mergemap["bidamount"].([]float64) } arr = append(arr,source.bidamount) source.mergemap["bidamount"] = arr source.bidamount = info.bidamount mergeArr = append(mergeArr,7) } //8、开天时间-地点 if source.bidopentime==0&&info.bidopentime!=0{ var arr []int64 if source.mergemap["bidopentime"]==nil { arr = make([]int64, 0) }else { arr = source.mergemap["bidopentime"].([]int64) } arr = append(arr,source.bidopentime) source.mergemap["bidopentime"] = arr source.bidopentime = info.bidopentime mergeArr = append(mergeArr,8) } //以上合并过于简单,待进一步优化 return source,mergeArr } //权重评估 func basicDataScore(v *Info, info *Info) bool { m,n:=0,0 if v.projectname!="" {m++} if v.buyer!="" {m++} if v.projectcode!="" {m++} if v.budget!=0 {m++} if v.bidamount!=0 {m++} if v.winner!="" {m++} if v.bidopentime!=0 {m++} if v.agencyaddr!="" {m++} if v.agency!="" {m=m+2} if v.city!="" {m=m+2} if info.projectname!="" {n++} if info.buyer!="" {n++} if info.projectcode!="" {n++} if info.budget!=0 {n++} if info.bidamount!=0 {n++} if info.winner!="" {n++} if info.bidopentime!=0 {n++} if info.agencyaddr!="" {n++} if info.agency!="" {n=m+2} if info.city!="" {n=m+2} if m>n { return true }else if m==n { if v.comeintime>=info.comeintime { return true }else { return false } }else { return false } } //无效数据 func invalidData(d1 string,d2 string,d3 string) bool{ var n int if d1 != "" { n++ } if d2 != "" { n++ } if d3 != "" { n++ } if n==0 { return true } return false }