{ "udpport": ":19097", "dupdays": 7, "mongodb": { "addr": "172.17.4.85:27080", "pool": 10, "db": "qfw", "extract": "result_20200715", "extract_back": "result_20200714", "site": { "dbname": "qfw", "coll": "site" } }, "task_mongodb": { "task_addrName": "172.17.4.187:27081", "task_dbName": "qfw", "task_collName": "ocr_flie_over", "pool": 5 }, "jkmail": { "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn", "api": "http://10.171.112.160:19281/_send/_mail" }, "nextNode": [ { "addr": "172.17.4.194", "port": 1782, "stype": "project", "memo": "合并项目" }, { "addr": "127.0.0.1", "port": 1783, "stype": "bidding", "memo": "创建招标数据索引new" } ], "threads": 1, "isMerger": false, "lowHeavy":true, "timingTask":false, "timingSpanDay": 5, "timingPubScope": 720, "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)", "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?", "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)", "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]", "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研" } mgo = &MongodbSim{ MongodbAddr: "172.17.4.187:27083", DbName: "qfw", Size: 10, } mgo.InitPool() return func moveTimeoutData() { log.Println("部署迁移定时任务") c := cron.New() c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() }) c.Start() } func moveOnceTimeOut() { log.Println("执行一次迁移超时数据") sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) now:=time.Now() move_time := time.Date(now.Year()-2, now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local).Unix() q := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lt": move_time, }, } log.Println(q) it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if index%10000 == 0 { log.Println("index", index) } del_id:=BsonTOStringId(tmp["_id"]) mgo.Save("result_20200713", tmp) mgo.DeleteById("result_20200714",del_id) tmp = map[string]interface{}{} } log.Println("save and delete", " ok index", index) } { "udpport": ":1785", "dupdays": 7, "mongodb": { "addr": "172.17.4.85:27080", "pool": 10, "db": "qfw", "extract": "result_20200715", "extract_back": "result_20200714", "site": { "dbname": "qfw", "coll": "site" } }, "jkmail": { "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn", "api": "http://172.17.145.179:19281/_send/_mail" }, "nextNode": [ { "addr": "127.0.0.1", "port": 1783, "stype": "bidding", "memo": "创建招标数据索引new" } ], "threads": 1, "isMerger": false, "lowHeavy":true, "timingTask":false, "timingSpanDay": 5, "timingPubScope": 720, "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)", "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?", "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)", "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]", "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研" } func repairHistory() { defer util.Catch() log.Println("执行修复程序") sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) q:=map[string]interface{}{} between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期 //开始判重 q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId("5f15bf800000000000000000"), "$lte": StringTOBsonId("5f2375b2a120e23754be1039"), }, } log.Println("历史判重查询条件:",q,"时间:", between_time) it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() num,oknum, deterTime:= int64(0),int64(0),int64(0) //计数 updateExtract := [][]map[string]interface{}{}//批量更新mongo数组 pendAllArr:=[][]map[string]interface{}{}//待处理数组 dayArr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(&tmp); num++ { if num%10000 == 0 { log.Println("正序遍历:", num) } source := util.ObjToMap(tmp["jsondata"]) if util.IntAll((*source)["sourcewebsite"]) == 1 { updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "dataging": 0, "repeat_reason": "sourcewebsite为1 重复", }, }, }) if len(updateExtract) > 50 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } tmp = make(map[string]interface{}) continue } //取-符合-发布时间X年内的数据 if util.IntAll(tmp["dataging"]) == 1 { pubtime := util.Int64All(tmp["publishtime"]) if pubtime > 0 && pubtime >= between_time { oknum++ if deterTime==0 { log.Println("找到第一条符合条件的数据") deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { if pubtime-deterTime >timingSpanDay*86400 { //新数组重新构建,当前组数据加到全部组数据 pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { dayArr = append(dayArr,tmp) } } }else { //不在两年内的也清标记 updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, }, }, }) if len(updateExtract) > 50 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } } } tmp = make(map[string]interface{}) } //批量更新标记 if len(updateExtract) > 0 { mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } if len(dayArr)>0 { pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} } log.Println("查询数量:",num,"符合条件:",oknum) if len(pendAllArr) <= 0 { log.Println("没找到dataging==1的数据") } //测试分组数量是否正确 testNum:=0 for k,v:=range pendAllArr { log.Println("第",k,"组--","数量:",len(v)) testNum = testNum+len(v) } log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum) n, repeateN := 0, 0 pool := make(chan bool, 2) wg := &sync.WaitGroup{} for k,v:=range pendAllArr { //每组结束更新一波数据 pool <- true wg.Add(1) go func(k int, v []map[string]interface{}) { defer func() { <-pool wg.Done() }() //每组临时数组 - 互不干扰 groupUpdateExtract := [][]map[string]interface{}{} //构建当前组的数据池 log.Println("构建第", k, "组---(数据池)") //当前组的第一个发布时间 first_pt := util.Int64All(v[len(v)-1]["publishtime"]) curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k)) log.Println("开始遍历判重第", k, "组 共计数量:", len(v)) n = n + len(v) log.Println("统计目前总数量:", n, "重复数量:", repeateN) for _, tmp := range v { info := NewInfo(tmp) if !LowHeavy { //是否进行低质量数据判重 if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) { log.Println("无效数据") groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": -1, //无效数据标签 "dataging": 0, }, }, }) if len(groupUpdateExtract) > 50 { mgo.UpSertBulk(extract, groupUpdateExtract...) groupUpdateExtract = [][]map[string]interface{}{} } return } } b, source, reason := curTM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 repeateN++ //重复数据打标签 groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, }, }, }) } else { groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, //符合条件的都为dataging==0 }, }, }) } if len(groupUpdateExtract) > 50 { mgo.UpSertBulk(extract, groupUpdateExtract...) groupUpdateExtract = [][]map[string]interface{}{} } } //每组数据结束-更新数据 if len(groupUpdateExtract) > 0 { mgo.UpSertBulk(extract, groupUpdateExtract...) } }(k, v) } wg.Wait() time.Sleep(30 * time.Second) log.Println("this repair over.", n, "repeateN:", repeateN,gtid,lteid) log.Println("修复结束") } //if !LowHeavy { //是否进行低质量数据判重 // if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) { // updateExtract = append(updateExtract, []map[string]interface{}{ // map[string]interface{}{ // "_id": tmp["_id"], // }, // map[string]interface{}{ // "$set": map[string]interface{}{ // "repeat": -1, //无效数据标签 // }, // }, // }) // if len(updateExtract) >= 200 { // mgo.UpSertBulk(extract, updateExtract...) // updateExtract = [][]map[string]interface{}{} // } // return // } //} if !LowHeavy { //是否进行低质量数据判重 if invalidData(info.buyer, info.projectname, info.projectcode, info.contractnumber) { log.Println("无效数据") updatelock.Lock() groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": -1, //无效数据标签 "dataging": 0, }, }, }) if len(groupUpdateExtract) > 200 { mgo.UpSertBulk(extract, groupUpdateExtract...) groupUpdateExtract = [][]map[string]interface{}{} } updatelock.Unlock() return } } //是否合并-低质量数据不合并 if isMerger && !strings.Contains(reason,"低质量"){ newData, update_map ,isReplace := mergeDataFields(source, info) if isReplace {//替换-数据池 fmt.Println("合并更新的id:",source.id) //数据池 - 替换 DM.replacePoolData(newData) //mongo更新 - 具体字段 - merge mgo.UpdateById(extract,source.id,update_map) //发udp 更新索引 //for _, to := range nextNode { // key := source.id + "-" + source.id + "-" + util.ObjToString(to["stype"]) // by, _ := json.Marshal(map[string]interface{}{ // "gtid": source.id, // "lteid": source.id, // "stype": "biddingall", // "key": key, // }) // addr := &net.UDPAddr{ // IP: net.ParseIP(to["addr"].(string)), // Port: util.IntAll(to["port"]), // } // node := &udpNode{by, addr, time.Now().Unix(), 0} // udptaskmap.Store(key, node) // udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) //} } }