package main /** 招标信息判重 **/ import ( "encoding/json" "flag" "fmt" "log" mu "mfw/util" "net" "os" "qfw/util" "qfw/util/mongodb" "regexp" "sync" "time" ) var ( Sysconfig map[string]interface{} //配置文件 mconf map[string]interface{} //mongodb配置信息 mgo *mongodb.MongodbSim //mongodb操作对象 extract string udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 dupdays = 5 //初始化判重范围 DM *datamap // HM *historymap //判重数据 lastid = "" /* 5da3f31aa5cb26b9b798d3aa */ //正则筛选相关 FilterRegTitle = regexp.MustCompile("^_$") FilterRegTitle_1 = regexp.MustCompile("^_$") FilterRegTitle_2 = regexp.MustCompile("^_$") isMerger bool //是否合并 threadNum int //线程数量 SiteMap map[string]map[string]interface{} //站点map idtype, sid, eid string //测试人员判重使用 ) func init() { flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据 flag.StringVar(&sid, "sid", "", "开始id") flag.StringVar(&eid, "eid", "", "结束id") flag.StringVar(&idtype, "idtype", "", "id类型,默认ObjectId:0,String:1") flag.Parse() //172.17.145.163:27080 util.ReadConfig(&Sysconfig) nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) mconf = Sysconfig["mongodb"].(map[string]interface{}) mgo = &mongodb.MongodbSim{ MongodbAddr: mconf["addr"].(string), DbName: mconf["db"].(string), Size: util.IntAllDef(mconf["pool"], 10), } extract = mconf["extract"].(string) mgo.InitPool() //测试可以临时注释 dupdays = util.IntAllDef(Sysconfig["dupdays"], 3) //加载数据 DM = NewDatamap(dupdays, lastid) FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"])) FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"])) FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"])) isMerger = Sysconfig["isMerger"].(bool) threadNum = util.IntAllDef(Sysconfig["threads"], 1) //站点配置 site := mconf["site"].(map[string]interface{}) SiteMap = make(map[string]map[string]interface{}, 0) start := int(time.Now().Unix()) sess_site := mgo.GetMgoConn() defer sess_site.Close() res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(nil).Sort("_id").Iter() for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); { data_map := map[string]interface{}{ "area": util.ObjToString(site_dict["area"]), "city": util.ObjToString(site_dict["city"]), "district": util.ObjToString(site_dict["district"]), "sitetype": util.ObjToString(site_dict["sitetype"]), "level": util.ObjToString(site_dict["level"]), } SiteMap[util.ObjToString(site_dict["site"])] = data_map } fmt.Printf("用时:%d秒,%d个", int(time.Now().Unix())-start, len(SiteMap)) } func main() { go checkMapJob() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) time.Sleep(99999 * time.Hour) } //测试组人员使用 func mainT() { //sid = "568551000000000000000000" //eid = "5e0f65000000000000000000" mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid,eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid mapinfo["stop"] = "true" task([]byte{}, mapinfo) time.Sleep(5 * time.Second) } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { fmt.Println("接受的段数据") switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { taskType := util.ObjToString(mapInfo["stype"]) if taskType == "historyTask" { //更新流程 go historyTask(data, mapInfo) } else if taskType == "normalTask" { //判重流程 go task(data, mapInfo) } else { //其他 go task(data, mapInfo) } key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) udptaskmap.Delete(ok) } } } //开始判重程序 func task(data []byte, mapInfo map[string]interface{}) { fmt.Println("开始数据判重") defer util.Catch() //区间id sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) var q map[string]interface{} if idtype == "1" { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mapInfo["gtid"].(string), "$lte": mapInfo["lteid"].(string), }, } } else { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } } log.Println(extract,mgo.DbName,q) it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() updateExtract := [][]map[string]interface{}{} log.Println("线程数:",threadNum) pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} //mapLock := &sync.Mutex{} n, repeateN := 0, 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { if n%10000 == 0 { log.Println("current:", n, tmp["_id"], "repeateN:", repeateN) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() info := NewInfo(tmp) //是否为无效数据 if invalidData(info.buyer, info.projectname, info.projectcode) { updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": -1, }, }, }) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } } else { b, source, reason := DM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 repeateN++ var mergeArr = []int64{} //更改合并数组记录 var newData = &Info{} //更换新的数据池数据 var id_map = map[string]interface{}{} repeat_id := source.id id_map["_id"] = util.StringTOBsonId(info.id) if isMerger { //需要合并相关操作-合并操作--评功权重打分-合并完替换原始数据池 basic_bool := basicDataScore(source, info) if basic_bool { //已原始数据为标准-对比数据打判重标签 newData, mergeArr = mergeDataFields(source, info) DM.replaceSourceData(newData, source.id) //替换 id_map["_id"] = util.StringTOBsonId(source.id) repeat_id = source.id } else { //已对比数据为标准 ,数据池的数据打判重标签 newData, mergeArr = mergeDataFields(info, source) DM.replaceSourceData(newData, source.id) //替换 id_map["_id"] = util.StringTOBsonId(info.id) repeat_id = info.id } } var update_map = map[string]interface{}{ "$set": map[string]interface{}{ "repeat_reason": reason, "repeat": 1, "repeatid": repeat_id, }, } if isMerger { if len(newData.mergemap) > 0 { update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap } //更新合并后的数据 for _, value := range mergeArr { if value == 1 { update_map["$set"].(map[string]interface{})["area"] = newData.area update_map["$set"].(map[string]interface{})["city"] = newData.city } else if value == 2 { update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname } else if value == 3 { update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode } else if value == 4 { update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer } else if value == 5 { update_map["$set"].(map[string]interface{})["budget"] = newData.budget } else if value == 6 { update_map["$set"].(map[string]interface{})["winner"] = newData.winner } else if value == 7 { update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount } else if value == 8 { update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime } else { } } } //构建数据库更新用到的 updateExtract = append(updateExtract, []map[string]interface{}{ id_map, update_map, }) } } }(tmp) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } tmp = make(map[string]interface{}) } wg.Wait() if len(updateExtract) > 0 { mgo.UpdateBulk(extract, updateExtract...) //mgo.UpdateBulk(bidding, updateBidding...) } log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"]) //任务完成,开始发送广播通知下面节点 if n > repeateN && mapInfo["stop"] == nil { for _, to := range nextNode { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } } //支持历史更新 func historyTask(data []byte, mapInfo map[string]interface{}) { fmt.Println("开始取历史时间段") defer util.Catch() sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter() minTime, maxTime := int64(0), int64(0) for tmp := make(map[string]interface{}); it.Next(&tmp); { //取出最大最小时间 if minTime == 0 || maxTime == 0 { minTime = util.Int64All(tmp["comeintime"]) maxTime = util.Int64All(tmp["comeintime"]) } else { t := util.Int64All(tmp["comeintime"]) if t < minTime && t != 0 { minTime = t } if t > maxTime && t != 0 { maxTime = t } } } fmt.Println("最小时间==", minTime, "最大时间==", maxTime) //最小时间== 1568087634 最大时间== 1568103381 HM = NewHistorymap(util.ObjToString(mapInfo["gtid"]), util.ObjToString(mapInfo["lteid"]), minTime, maxTime) //return //开始判重... defer util.Catch() sess_task := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess_task) q_task := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": util.StringTOBsonId(mapInfo["gtid"].(string)), "$lte": util.StringTOBsonId(mapInfo["lteid"].(string)), }, } it_task := sess.DB(mgo.DbName).C(extract).Find(&q_task).Iter() updateExtract := [][]map[string]interface{}{} pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} //mapLock := &sync.Mutex{} n, repeateN := 0, 0 for tmp := make(map[string]interface{}); it_task.Next(&tmp); n++ { if n%10000 == 0 { log.Println("current:", n, tmp["_id"], "repeateN:", repeateN) } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() info := NewInfo(tmp) if invalidData(info.buyer, info.projectname, info.projectcode) { //mapLock.Lock() updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": -1, }, }, }) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } //mapLock.Unlock() } else { b, source, reason := HM.checkHistory(info) if b { //有重复,生成更新语句,更新抽取和更新招标 if reason == "未判重记录" { fmt.Println("未判重记录") //把info的数据判重的标签更换,并新增字段 DM.replaceSourceData(info, info.id) //替换即添加 updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 0, "repeatid": -2, }, }, }) } else { repeateN++ var mergeArr = []int64{} //更改合并数组记录 var newData = &Info{} //更换新的数据池数据 var id_map = map[string]interface{}{} repeat_id := source.id id_map["_id"] = util.StringTOBsonId(info.id) if isMerger { //需要合并相关操作-合并操作--评功权重打分-合并完替换原始数据池 basic_bool := basicDataScore(source, info) if basic_bool { //已原始数据为标准-对比数据打判重标签 newData, mergeArr = mergeDataFields(source, info) DM.replaceSourceData(newData, source.id) //替换 id_map["_id"] = util.StringTOBsonId(source.id) repeat_id = source.id } else { //已对比数据为标准 ,数据池的数据打判重标签 newData, mergeArr = mergeDataFields(info, source) DM.replaceSourceData(newData, source.id) //替换 id_map["_id"] = util.StringTOBsonId(info.id) repeat_id = info.id } } var update_map = map[string]interface{}{ "$set": map[string]interface{}{ "repeat_reason": reason, "repeat": 1, "repeatid": repeat_id, }, } if isMerger { //合并记录 if len(newData.mergemap) > 0 { update_map["$set"].(map[string]interface{})["merge"] = newData.mergemap } //更新合并后的数据 for _, value := range mergeArr { if value == 1 { update_map["$set"].(map[string]interface{})["area"] = newData.area update_map["$set"].(map[string]interface{})["city"] = newData.city } else if value == 2 { update_map["$set"].(map[string]interface{})["projectname"] = newData.projectname } else if value == 3 { update_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode } else if value == 4 { update_map["$set"].(map[string]interface{})["buyer"] = newData.buyer } else if value == 5 { update_map["$set"].(map[string]interface{})["budget"] = newData.budget } else if value == 6 { update_map["$set"].(map[string]interface{})["winner"] = newData.winner } else if value == 7 { update_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount } else if value == 8 { update_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime } else { } } } //构建数据库更新用到的 updateExtract = append(updateExtract, []map[string]interface{}{ id_map, update_map, }) } } } }(tmp) if len(updateExtract) > 500 { mgo.UpdateBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } tmp = make(map[string]interface{}) } wg.Wait() if len(updateExtract) > 0 { mgo.UpdateBulk(extract, updateExtract...) //mgo.UpdateBulk(bidding, updateBidding...) } log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"]) //任务完成,开始发送广播通知下面节点 if n > repeateN && mapInfo["stop"] == nil { for _, to := range nextNode { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } } //合并字段 func mergeDataFields(source *Info, info *Info) (*Info, []int64) { var mergeArr []int64 mergeArr = make([]int64, 0) //1、城市 if (source.area == "" || source.area == "全国") && info.area != "全国" && info.area != "" { var arrA []string if source.mergemap["area"] == nil { arrA = make([]string, 0) } else { arrA = source.mergemap["area"].([]string) } arrA = append(arrA, source.area) source.mergemap["area"] = arrA var arrC []string if source.mergemap["city"] == nil { arrC = make([]string, 0) } else { arrC = source.mergemap["city"].([]string) } arrC = append(arrC, source.city) source.mergemap["city"] = arrC source.area = info.area source.city = info.city mergeArr = append(mergeArr, 1) } //2、项目名称 if source.projectname == "" && info.projectname != "" { var arr []string if source.mergemap["projectname"] == nil { arr = make([]string, 0) } else { arr = source.mergemap["projectname"].([]string) } arr = append(arr, source.projectname) source.mergemap["projectname"] = arr source.projectname = info.projectname mergeArr = append(mergeArr, 2) } //3、项目编号 if source.projectcode == "" && info.projectcode != "" { var arr []string if source.mergemap["projectcode"] == nil { arr = make([]string, 0) } else { arr = source.mergemap["projectcode"].([]string) } arr = append(arr, source.projectcode) source.mergemap["projectcode"] = arr source.projectcode = info.projectcode mergeArr = append(mergeArr, 3) } //4、采购单位 if source.buyer == "" && info.buyer != "" { var arr []string if source.mergemap["buyer"] == nil { arr = make([]string, 0) } else { arr = source.mergemap["buyer"].([]string) } arr = append(arr, source.buyer) source.mergemap["buyer"] = arr source.buyer = info.buyer mergeArr = append(mergeArr, 4) } //5、预算 if source.budget == 0 && info.budget != 0 { var arr []float64 if source.mergemap["budget"] == nil { arr = make([]float64, 0) } else { arr = source.mergemap["budget"].([]float64) } arr = append(arr, source.budget) source.mergemap["budget"] = arr source.budget = info.budget mergeArr = append(mergeArr, 5) } //6、中标单位 if source.winner == "" && info.winner != "" { var arr []string if source.mergemap["winner"] == nil { arr = make([]string, 0) } else { arr = source.mergemap["winner"].([]string) } arr = append(arr, source.winner) source.mergemap["winner"] = arr source.winner = info.winner mergeArr = append(mergeArr, 6) } //7、中标金额 if source.bidamount == 0 && info.bidamount != 0 { var arr []float64 if source.mergemap["bidamount"] == nil { arr = make([]float64, 0) } else { arr = source.mergemap["bidamount"].([]float64) } arr = append(arr, source.bidamount) source.mergemap["bidamount"] = arr source.bidamount = info.bidamount mergeArr = append(mergeArr, 7) } //8、开标时间-地点 if source.bidopentime == 0 && info.bidopentime != 0 { var arr []int64 if source.mergemap["bidopentime"] == nil { arr = make([]int64, 0) } else { arr = source.mergemap["bidopentime"].([]int64) } arr = append(arr, source.bidopentime) source.mergemap["bidopentime"] = arr source.bidopentime = info.bidopentime mergeArr = append(mergeArr, 8) } //以上合并过于简单,待进一步优化 return source, mergeArr } //权重评估 func basicDataScore(v *Info, info *Info) bool { /* 权重评估 网站优先级判定规则: 1、中央>省>市>县区 2、政府采购>公共资源>采购单位官网>招标代理公司/平台 */ v_score, info_score := -1, -1 dict_v := SiteMap[v.site] dict_info := SiteMap[info.site] //先判断level if dict_v != nil { v_level := util.ObjToString(dict_v["level"]) if v_level == "中央" { v_score = 4 } else if v_level == "省级" { v_score = 3 } else if v_level == "市级" { v_score = 2 } else if v_level == "县区" { v_score = 1 } else if v_level == "" { } else { v_score = 0 } } if dict_info != nil { info_level := util.ObjToString(dict_info["level"]) if info_level == "中央" { info_score = 4 } else if info_level == "省级" { info_score = 3 } else if info_level == "市级" { info_score = 2 } else if info_level == "县区" { info_score = 1 } else if info_level == "" { } else { v_score = 0 } } if v_score > info_score { return true } if v_score < info_score { return false } //判断sitetype if dict_v != nil { v_sitetype := util.ObjToString(dict_v["sitetype"]) if v_sitetype == "政府采购" || v_sitetype == "政府门户" { v_score = 4 } else if v_sitetype == "公共资源" { v_score = 3 } else if v_sitetype == "官方网站" { v_score = 2 } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" { v_score = 1 } else if v_sitetype == "" { } else { v_score = 0 } } if dict_info != nil { info_sitetype := util.ObjToString(dict_info["sitetype"]) if info_sitetype == "政府采购" || info_sitetype == "政府门户" { info_score = 4 } else if info_sitetype == "公共资源" { info_score = 3 } else if info_sitetype == "官方网站" { info_score = 2 } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" { info_score = 1 } else if info_sitetype == "" { } else { info_score = 0 } } if v_score > info_score { return true } if v_score < info_score { return false } //网站评估 m, n := 0, 0 if v.projectname != "" { m++ } if v.buyer != "" { m++ } if v.projectcode != "" { m++ } if v.budget != 0 { m++ } if v.bidamount != 0 { m++ } if v.winner != "" { m++ } if v.bidopentime != 0 { m++ } if v.agencyaddr != "" { m++ } if v.agency != "" { m = m + 2 } if v.city != "" { m = m + 2 } if info.projectname != "" { n++ } if info.buyer != "" { n++ } if info.projectcode != "" { n++ } if info.budget != 0 { n++ } if info.bidamount != 0 { n++ } if info.winner != "" { n++ } if info.bidopentime != 0 { n++ } if info.agencyaddr != "" { n++ } if info.agency != "" { n = n + 2 } if info.city != "" { n = n + 2 } if m > n { return true } else if m == n { if v.comeintime >= info.comeintime { return true } else { return false } } else { return false } } //无效数据 func invalidData(d1 string, d2 string, d3 string) bool { var n int if d1 != "" { n++ } if d2 != "" { n++ } if d3 != "" { n++ } if n == 0 { return true } return false }