package main /** 招标信息判重 **/ import ( "encoding/json" "flag" "fmt" "github.com/cron" "log" mu "mfw/util" "net" "os" "qfw/util" "regexp" "strings" "sync" "time" "gopkg.in/mgo.v2/bson" ) var ( Sysconfig map[string]interface{} //配置文件 mconf map[string]interface{} //mongodb配置信息 mgo *MongodbSim //mongodb操作对象 extract string extract_back string udpclient mu.UdpClient //udp对象 nextNode []map[string]interface{} //下节点数组 dupdays = 5 //初始化判重范围 DM *datamap // //正则筛选相关 FilterRegTitle = regexp.MustCompile("^_$") FilterRegTitle_0 = regexp.MustCompile("^_$") FilterRegTitle_1 = regexp.MustCompile("^_$") FilterRegTitle_2 = regexp.MustCompile("^_$") isMerger bool //是否合并 Is_Sort bool //是否排序 threadNum int //线程数量 SiteMap map[string]map[string]interface{} //站点map LowHeavy bool //低质量数据判重 TimingTask bool //是否定时任务 timingSpanDay int64 //时间跨度 timingPubScope int64 //发布时间周期 sid,eid,lastid string //测试人员判重使用 IdType bool //默认object类型 ) func init() { flag.StringVar(&lastid, "id", "", "最后加载id") //以小于等于此id开始加载最近几天的数据 flag.StringVar(&sid, "sid", "", "开始id") flag.StringVar(&eid, "eid", "", "结束id") flag.Parse() //172.17.145.163:27080 util.ReadConfig(&Sysconfig) nextNode = util.ObjArrToMapArr(Sysconfig["nextNode"].([]interface{})) mconf = Sysconfig["mongodb"].(map[string]interface{}) mgo = &MongodbSim{ MongodbAddr: mconf["addr"].(string), DbName: mconf["db"].(string), Size: util.IntAllDef(mconf["pool"], 10), } mgo.InitPool() extract = mconf["extract"].(string) extract_back = mconf["extract_back"].(string) dupdays = util.IntAllDef(Sysconfig["dupdays"], 3) //加载数据 DM = NewDatamap(dupdays, lastid) FilterRegTitle = regexp.MustCompile(util.ObjToString(Sysconfig["specialwords"])) FilterRegTitle_0 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_0"])) FilterRegTitle_1 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_1"])) FilterRegTitle_2 = regexp.MustCompile(util.ObjToString(Sysconfig["specialtitle_2"])) isMerger = Sysconfig["isMerger"].(bool) Is_Sort = Sysconfig["isSort"].(bool) threadNum = util.IntAllDef(Sysconfig["threads"], 1) LowHeavy = Sysconfig["lowHeavy"].(bool) TimingTask = Sysconfig["timingTask"].(bool) timingSpanDay = util.Int64All(Sysconfig["timingSpanDay"]) timingPubScope = util.Int64All(Sysconfig["timingPubScope"]) //站点配置 site := mconf["site"].(map[string]interface{}) SiteMap = make(map[string]map[string]interface{}, 0) start := int(time.Now().Unix()) sess_site := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess_site) res_site := sess_site.DB(site["dbname"].(string)).C(site["coll"].(string)).Find(map[string]interface{}{}).Sort("_id").Iter() for site_dict := make(map[string]interface{}); res_site.Next(&site_dict); { data_map := map[string]interface{}{ "area": util.ObjToString(site_dict["area"]), "city": util.ObjToString(site_dict["city"]), "district": util.ObjToString(site_dict["district"]), "sitetype": util.ObjToString(site_dict["sitetype"]), "level": util.ObjToString(site_dict["level"]), "weight": util.ObjToString(site_dict["weight"]), } SiteMap[util.ObjToString(site_dict["site"])] = data_map } log.Printf("new站点加载用时:%d秒,%d个\n", int(time.Now().Unix())-start, len(SiteMap)) } func main() { go checkMapJob() updport := Sysconfig["udpport"].(string) udpclient = mu.UdpClient{Local: updport, BufSize: 1024} udpclient.Listen(processUdpMsg) log.Println("Udp服务监听", updport) if TimingTask { go timedTaskDay() } time.Sleep(99999 * time.Hour) } //测试组人员使用 func mainT() { if TimingTask { log.Println("定时任务测试开始") go timedTaskDay() time.Sleep(99999 * time.Hour) } else { //2019年8月1日-8月17日 712646 /* sid = "5d55031fa5cb26b9b7f57570" eid = "5e8c02b150b5ea296eed4509" 5e933b1a50b5ea296ef0e839 */ //IdType = true sid = "5ece4b1b9e628c59915eb257" eid = "5ed55b6d9e628c599161977c" log.Println("正常判重测试开始") log.Println(sid, "---", eid) mapinfo := map[string]interface{}{} if sid == "" || eid == "" { log.Println("sid,eid参数不能为空") os.Exit(0) } mapinfo["gtid"] = sid mapinfo["lteid"] = eid mapinfo["stop"] = "true" task([]byte{}, mapinfo) time.Sleep(99999 * time.Hour) } } func processUdpMsg(act byte, data []byte, ra *net.UDPAddr) { fmt.Println("接受的段数据") switch act { case mu.OP_TYPE_DATA: //上个节点的数据 //从表中开始处理 var mapInfo map[string]interface{} err := json.Unmarshal(data, &mapInfo) log.Println("err:", err, "mapInfo:", mapInfo) if err != nil { udpclient.WriteUdp([]byte("err:"+err.Error()), mu.OP_NOOP, ra) } else if mapInfo != nil { taskType := util.ObjToString(mapInfo["stype"]) if taskType == "normalTask" { //判重流程 go task(data, mapInfo) } else { //其他 go task(data, mapInfo) } key, _ := mapInfo["key"].(string) if key == "" { key = "udpok" } udpclient.WriteUdp([]byte(key), mu.OP_NOOP, ra) } case mu.OP_NOOP: //下个节点回应 ok := string(data) if ok != "" { log.Println("ok:", ok) udptaskmap.Delete(ok) } } } //开始判重程序 func task(data []byte, mapInfo map[string]interface{}) { log.Println("开始数据判重") defer util.Catch() //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } if IdType { q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": mapInfo["gtid"].(string), "$lte": mapInfo["lteid"].(string), }, } } log.Println(mgo.DbName, extract, q) sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) //是否排序 sortName :="_id" if Is_Sort { sortName = "publishtime" log.Println("排序") } it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort(sortName).Iter() updateExtract := [][]map[string]interface{}{} ids:=[]string{} log.Println("线程数:", threadNum) pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} n, repeateN := 0, 0 for tmp := make(map[string]interface{}); it.Next(&tmp); n++ { if n%10000 == 0 { log.Println("current:", n, tmp["_id"], "repeateN:", repeateN) } source := util.ObjToMap(tmp["jsondata"]) if util.IntAll((*source)["sourcewebsite"]) == 1 { repeateN++ updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": "sourcewebsite为1,重复", }, }, }) if len(updateExtract) >= 200 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } tmp = make(map[string]interface{}) continue } if util.IntAll(tmp["repeat"]) == 1 || util.IntAll(tmp["repeat"]) == -1|| util.IntAll(tmp["dataging"]) == 1 { if util.IntAll(tmp["repeat"]) == 1 { repeateN++ } tmp = make(map[string]interface{}) continue } pool <- true wg.Add(1) go func(tmp map[string]interface{}) { defer func() { <-pool wg.Done() }() info := NewInfo(tmp) if !LowHeavy { //是否进行低质量数据判重 if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) { updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": -1, //无效数据标签 }, }, }) if len(updateExtract) >= 200 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } return } } b, source, reason := DM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 repeateN++ var is_replace = false var mergeArr = []int64{} //更改合并数组记录 var newData = &Info{} //更换新的数据池数据 var repeat_idMap = map[string]interface{}{} //记录判重的 var merge_idMap = map[string]interface{}{} //记录合并的 repeat_idMap["_id"] = StringTOBsonId(info.id) if IdType { repeat_idMap["_id"] = info.id } merge_idMap["_id"] = StringTOBsonId(source.id) repeat_id := source.id //初始化一个数据 if isMerger { //合并相关 basic_bool := basicDataScore(source, info) if basic_bool { //已原始数据为标准 - 对比数据打判重标签- newData, mergeArr, is_replace = mergeDataFields(source, info) DM.replaceSourceData(newData, source) //替换 //对比数据打重复标签的id,原始数据id的记录 repeat_idMap["_id"] = StringTOBsonId(info.id) merge_idMap["_id"] = StringTOBsonId(source.id) if IdType { repeat_idMap["_id"] = info.id merge_idMap["_id"] = source.id } repeat_id = source.id } else { //已对比数据为标准 ,数据池的数据打判重标签 newData, mergeArr, is_replace = mergeDataFields(info, source) DM.replaceSourceData(newData, source) //替换 //原始数据打重复标签的id, 对比数据id的记录 repeat_idMap["_id"] = StringTOBsonId(source.id) merge_idMap["_id"] = StringTOBsonId(info.id) if IdType { repeat_idMap["_id"] = source.id merge_idMap["_id"] = info.id } repeat_id = info.id } merge_map := make(map[string]interface{}, 0) if is_replace { //有过合并-更新数据 merge_map = map[string]interface{}{ "$set": map[string]interface{}{ "merge": newData.mergemap, }, } //更新合并后的数据 for _, value := range mergeArr { if value == 0 { merge_map["$set"].(map[string]interface{})["area"] = newData.area merge_map["$set"].(map[string]interface{})["city"] = newData.city } else if value == 1 { merge_map["$set"].(map[string]interface{})["area"] = newData.area merge_map["$set"].(map[string]interface{})["city"] = newData.city } else if value == 2 { merge_map["$set"].(map[string]interface{})["projectname"] = newData.projectname } else if value == 3 { merge_map["$set"].(map[string]interface{})["projectcode"] = newData.projectcode } else if value == 4 { merge_map["$set"].(map[string]interface{})["buyer"] = newData.buyer } else if value == 5 { merge_map["$set"].(map[string]interface{})["budget"] = newData.budget } else if value == 6 { merge_map["$set"].(map[string]interface{})["winner"] = newData.winner } else if value == 7 { merge_map["$set"].(map[string]interface{})["bidamount"] = newData.bidamount } else if value == 8 { merge_map["$set"].(map[string]interface{})["bidopentime"] = newData.bidopentime } else if value == 9 { merge_map["$set"].(map[string]interface{})["contractnumber"] = newData.contractnumber } else if value == 10 { merge_map["$set"].(map[string]interface{})["publishtime"] = newData.publishtime } else if value == 11 { merge_map["$set"].(map[string]interface{})["agency"] = newData.agency } else { } } //模板数据更新 updateExtract = append(updateExtract, []map[string]interface{}{ merge_idMap, merge_map, }) } } else { //高质量数据 basic_bool := basicDataScore(source, info) if !basic_bool { DM.replaceSourceData(info, source) //替换 repeat_idMap["_id"] = StringTOBsonId(source.id) if IdType { repeat_idMap["_id"] = source.id } repeat_id = info.id if len(ids)>=9 { ids=append(ids,source.id) for _, to := range nextNode { key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": source.id, "lteid": source.id, "stype": util.ObjToString(to["stype"]), "key": key, "ids": strings.Join(ids, ","), }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } // ids = []string{} }else { ids=append(ids,source.id) } } } //if repeateN%150==0&&repeateN>0 { // fmt.Println("最终结果","目标id:",repeat_idMap["_id"]) //} //重复数据打标签 updateExtract = append(updateExtract, []map[string]interface{}{ repeat_idMap, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": repeat_id, }, }, }) } }(tmp) if len(updateExtract) >= 200 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } tmp = make(map[string]interface{}) } wg.Wait() if len(updateExtract) > 0 { mgo.UpSertBulk(extract, updateExtract...) } log.Println("this task over.", n, "repeateN:", repeateN, mapInfo["stop"]) //任务完成,开始发送广播通知下面节点 if n > repeateN && mapInfo["stop"] == nil { log.Println("判重任务完成发送udp") for _, to := range nextNode { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": util.ObjToString(to["stype"]), "key": key, "ids": strings.Join(ids, ","), }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } } //定时任务 func timedTaskDay() { log.Println("部署定时任务") c := cron.New() c.AddFunc("0 0 1 * * ?", func() { movedata() }) //每天凌晨1点执行一次 c.AddFunc("0 0 */4 * * ?", func() { timedTaskOnce() }) //每天凌晨2点执行一次 c.Start() //timedTaskOnce() } func timedTaskOnce() { log.Println("开始一次定时任务") defer util.Catch() //当前时间-8 -4 小时 now := time.Now() log.Println(now) preTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-8, 0, 0, 0, time.Local) curTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-4, 0, 0, 0, time.Local) log.Println(preTime,curTime) task_sid := util.BsonIdToSId(bson.NewObjectIdWithTime(preTime)) task_eid := util.BsonIdToSId(bson.NewObjectIdWithTime(curTime)) between_time := curTime.Unix() - (86400 * timingPubScope) log.Println("id区间:",task_sid, task_eid,"时间:", between_time) //区间id q_start := map[string]interface{}{ "_id": map[string]interface{}{ "$gte": StringTOBsonId(task_sid), "$lte": StringTOBsonId(task_eid), }, } sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) it_start := sess.DB(mgo.DbName).C(extract).Find(&q_start).Sort("publishtime").Iter() num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数 updateExtract := [][]map[string]interface{}{}//批量更新mongo数组 pendAllArr:=[][]map[string]interface{}{}//待处理数组 dayArr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it_start.Next(&tmp); num++ { if num%10000 == 0 { log.Println("正序遍历:", num) } source := util.ObjToMap(tmp["jsondata"]) if util.IntAll((*source)["sourcewebsite"]) == 1 { updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "dataging": 0, "repeat_reason": "sourcewebsite为1 重复", }, }, }) if len(updateExtract) > 50 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } tmp = make(map[string]interface{}) continue } //取-符合-发布时间X年内的数据 if util.IntAll(tmp["dataging"]) == 1 { pubtime := util.Int64All(tmp["publishtime"]) if pubtime > 0 && pubtime >= between_time { oknum++ if deterTime==0 { log.Println("找到第一条符合条件的数据") deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { if pubtime-deterTime >timingSpanDay*86400 { //新数组重新构建,当前组数据加到全部组数据 pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { dayArr = append(dayArr,tmp) } } }else { //不在两年内的也清标记 updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, }, }, }) if len(updateExtract) > 50 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } } } tmp = make(map[string]interface{}) } //批量更新标记 if len(updateExtract) > 0 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } if len(dayArr)>0 { pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} } log.Println("查询数量:",num,"符合条件:",oknum) if len(pendAllArr) <= 0 { log.Println("没找到dataging==1的数据") return } //测试分组数量是否正确 testNum:=0 for k,v:=range pendAllArr { log.Println("第",k,"组--","数量:",len(v)) testNum = testNum+len(v) } log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum) n, repeateN := 0, 0 for k,v:=range pendAllArr { //每组结束更新一波数据 //构建当前组的数据池 log.Println("构建第",k,"组---(数据池)") //当前组的第一个发布时间 first_pt :=util.Int64All(v[0]["publishtime"]) coll :=extract_back if isTaskTimeCycle(first_pt) { coll = extract } DM = TimedTaskDatamap(dupdays, first_pt,coll) log.Println("开始遍历判重第",k,"组 共计数量:",len(v)) n = n+len(v) log.Println("统计目前总数量:",n,"重复数量:",repeateN) for _,tmp:=range v { info := NewInfo(tmp) if !LowHeavy { //是否进行低质量数据判重 if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) { log.Println("无效数据") updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": -1, //无效数据标签 "dataging": 0, }, }, }) if len(updateExtract) > 50 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } continue } } b, source, reason := DM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 log.Println("判重结果", b, reason,"目标id",info.id) repeateN++ //重复数据打标签 updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, }, }, }) }else { updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0,//符合条件的都为dataging==0 }, }, }) } if len(updateExtract) > 50 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } } //每组数据结束-更新数据 if len(updateExtract) > 0 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } } if len(updateExtract) > 0 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } log.Println("this timeTask over.", n, "repeateN:", repeateN) //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并 if n > repeateN { for _, to := range nextNode { next_sid := util.BsonIdToSId(task_sid) next_eid := util.BsonIdToSId(task_eid) key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": next_sid, "lteid": next_eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } } //判断是否在周期天内 func isTaskTimeCycle(pt int64) bool { year, month, day := time.Now().Date() predur_pt:=time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix() log.Println(predur_pt) if pt >= predur_pt { return true }else { return false } } //合并字段-并更新merge字段的值 func mergeDataFields(source *Info, info *Info) (*Info, []int64, bool) { merge_recordMap := make(map[string]interface{}, 0) mergeArr := make([]int64, 0) //是否替换数据了-记录原始的数据 is_replace := false //1、城市 if source.area == "" || source.area == "全国" { //为空 if info.area != "全国" && info.area != "" { merge_recordMap["area"] = source.area merge_recordMap["city"] = source.city source.area = info.area source.city = info.city mergeArr = append(mergeArr, 1) is_replace = true } } else { //不为空-查看站点相关-有值必替换 if source.is_site { //是站点替换的城市 merge_recordMap["site_area"] = source.area merge_recordMap["site_city"] = source.city mergeArr = append(mergeArr, 0) is_replace = true source.is_site = false } } //2、项目名称 if source.projectname == "" && info.projectname != "" { merge_recordMap["projectname"] = source.projectname source.projectname = info.projectname mergeArr = append(mergeArr, 2) is_replace = true } //3、项目编号 if source.projectcode == "" && info.projectcode != "" { merge_recordMap["projectcode"] = source.projectcode source.projectcode = info.projectcode mergeArr = append(mergeArr, 3) is_replace = true } //4、采购单位 if source.buyer == "" && info.buyer != "" { merge_recordMap["buyer"] = source.buyer source.buyer = info.buyer mergeArr = append(mergeArr, 4) is_replace = true } //5、预算 if source.budget == 0 && info.budget != 0 { merge_recordMap["budget"] = source.budget source.budget = info.budget mergeArr = append(mergeArr, 5) is_replace = true } //6、中标单位 if source.winner == "" && info.winner != "" { merge_recordMap["winner"] = source.winner source.winner = info.winner mergeArr = append(mergeArr, 6) is_replace = true } //7、中标金额 if source.bidamount == 0 && info.bidamount != 0 { merge_recordMap["bidamount"] = source.bidamount source.bidamount = info.bidamount mergeArr = append(mergeArr, 7) is_replace = true } //8、开标时间-地点 if source.bidopentime == 0 && info.bidopentime != 0 { merge_recordMap["bidopentime"] = source.bidopentime source.bidopentime = info.bidopentime mergeArr = append(mergeArr, 8) is_replace = true } //9、合同编号 if source.contractnumber == "" && info.contractnumber != "" { merge_recordMap["contractnumber"] = source.contractnumber source.contractnumber = info.contractnumber mergeArr = append(mergeArr, 9) is_replace = true } //10、发布时间 if source.publishtime == 0 && info.publishtime != 0 { merge_recordMap["publishtime"] = source.publishtime source.publishtime = info.publishtime mergeArr = append(mergeArr, 10) is_replace = true } //11、代理机构 if source.agency == "" && info.agency != "" { merge_recordMap["agency"] = source.agency source.agency = info.agency mergeArr = append(mergeArr, 11) is_replace = true } if is_replace { //有过替换更新 //总次数+1 source.mergemap["total_num"] = util.Int64All(source.mergemap["total_num"]) + 1 merge_recordMap["num"] = util.Int64All(source.mergemap["total_num"]) //和哪一个数据id进行非空替换的-记录 key := info.id source.mergemap[key] = merge_recordMap } //待进一步优化 return source, mergeArr, is_replace } //权重评估 func basicDataScore(v *Info, info *Info) bool { /* 权重评估 网站优先级判定规则: 1、国家>省级>市级>县区 2、政府采购>公共资源>官方网站|政府门户>社会公共招标平台|企业招标平台 3、同sitetype-分析weight 4、要素打分-分析 */ v_score, info_score := -1, -1 dict_v := SiteMap[v.site] dict_info := SiteMap[info.site] //先判断level if dict_v != nil { v_level := util.ObjToString(dict_v["level"]) if v_level == "国家" { v_score = 4 } else if v_level == "省级" { v_score = 3 } else if v_level == "市级" { v_score = 2 } else if v_level == "县区" { v_score = 1 } else if v_level == "" { } else { v_score = 0 } } if dict_info != nil { info_level := util.ObjToString(dict_info["level"]) if info_level == "国家" { info_score = 4 } else if info_level == "省级" { info_score = 3 } else if info_level == "市级" { info_score = 2 } else if info_level == "县区" { info_score = 1 } else if info_level == "" { } else { v_score = 0 } } if v_score > info_score { return true } if v_score < info_score { return false } //判断sitetype if dict_v != nil { v_sitetype := util.ObjToString(dict_v["sitetype"]) if v_sitetype == "政府采购" { v_score = 4 } else if v_sitetype == "公共资源" { v_score = 3 } else if v_sitetype == "官方网站"|| v_sitetype == "政府门户" { v_score = 2 } else if v_sitetype == "社会公共招标平台" || v_sitetype == "企业招标平台" { v_score = 1 } else if v_sitetype == "" { } else { v_score = 0 } } if dict_info != nil { info_sitetype := util.ObjToString(dict_info["sitetype"]) if info_sitetype == "政府采购" { info_score = 4 } else if info_sitetype == "公共资源" { info_score = 3 } else if info_sitetype == "官方网站"|| info_sitetype == "政府门户" { info_score = 2 } else if info_sitetype == "社会公共招标平台" || info_sitetype == "企业招标平台" { info_score = 1 } else if info_sitetype == "" { } else { info_score = 0 } } if v_score > info_score { return true } if v_score < info_score { return false } if v_score == info_score {//同sitetype 情况下 分析weight v_weight := util.IntAll(dict_v["weight"]) info_weight := util.IntAll(dict_info["weight"]) if v_weight>info_weight { return true } if info_weight>v_weight { return false } } //网站评估 m, n := 0, 0 if v.projectname != "" { m++ } if v.buyer != "" { m++ } if v.projectcode != "" || v.contractnumber != "" { m++ } if v.budget != 0 { m++ } if v.bidamount != 0 { m++ } if v.winner != "" { m++ } if v.bidopentime != 0 { m++ } if v.bidopenaddress != "" { m++ } if v.agency != "" { m = m + 2 } if v.city != "" { m = m + 2 } if info.projectname != "" { n++ } if info.buyer != "" { n++ } if info.projectcode != "" || info.contractnumber != "" { n++ } if info.budget != 0 { n++ } if info.bidamount != 0 { n++ } if info.winner != "" { n++ } if info.bidopentime != 0 { n++ } if info.bidopenaddress != "" { n++ } if info.agency != "" { n = n + 2 } if info.city != "" { n = n + 2 } if m > n { return true } else if m == n { if v.publishtime >= info.publishtime { return true } else { return false } } else { return false } } //无效数据 func invalidData(d1 string, d2 string, d3 string, d4 string) bool { var n int if d1 != "" { n++ } if d2 != "" { n++ } if d3 != "" { n++ } if d4 != "" { n++ } if n == 0 { return true } return false } //迁移数据dupdays+5之前的数据 func movedata() { sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) year, month, day := time.Now().Date() q := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour).Unix(), }, } log.Println(q) it := sess.DB(mgo.DbName).C(extract).Find(&q).Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { mgo.Save(extract_back, tmp) tmp = map[string]interface{}{} if index%1000 == 0 { log.Println("index", index) } } log.Println("save to", extract_back, " ok index", index) qv := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays) * 24 * time.Hour*2).Unix(), }, } delnum := mgo.Delete(extract, qv) log.Println("remove from ", extract, delnum) }