{ "udpport": ":19097", "dupdays": 7, "mongodb": { "addr": "172.17.4.85:27080", "pool": 10, "db": "qfw", "extract": "result_20200715", "extract_back": "result_20200714", "site": { "dbname": "qfw", "coll": "site" } }, "task_mongodb": { "task_addrName": "172.17.4.187:27081", "task_dbName": "qfw", "task_collName": "ocr_flie_over", "pool": 5 }, "jkmail": { "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn", "api": "http://10.171.112.160:19281/_send/_mail" }, "nextNode": [ { "addr": "172.17.4.194", "port": 1782, "stype": "project", "memo": "合并项目" }, { "addr": "127.0.0.1", "port": 1783, "stype": "bidding", "memo": "创建招标数据索引new" } ], "threads": 1, "isMerger": false, "lowHeavy":true, "timingTask":false, "timingSpanDay": 5, "timingPubScope": 720, "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)", "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?", "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)", "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]", "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研" } mgo = &MongodbSim{ MongodbAddr: "172.17.4.187:27083", DbName: "qfw", Size: 10, } mgo.InitPool() return func moveTimeoutData() { log.Println("部署迁移定时任务") c := cron.New() c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() }) c.Start() } func moveOnceTimeOut() { log.Println("执行一次迁移超时数据") sess := mgo.GetMgoConn() defer mgo.DestoryMongoConn(sess) now:=time.Now() move_time := time.Date(now.Year()-2, now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local).Unix() q := map[string]interface{}{ "comeintime": map[string]interface{}{ "$lt": move_time, }, } log.Println(q) it := sess.DB(mgo.DbName).C("result_20200714").Find(&q).Iter() index := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); index++ { if index%10000 == 0 { log.Println("index", index) } del_id:=BsonTOStringId(tmp["_id"]) mgo.Save("result_20200713", tmp) mgo.DeleteById("result_20200714",del_id) tmp = map[string]interface{}{} } log.Println("save and delete", " ok index", index) } { "udpport": ":1785", "dupdays": 7, "mongodb": { "addr": "172.17.4.85:27080", "pool": 10, "db": "qfw", "extract": "result_20200715", "extract_back": "result_20200714", "site": { "dbname": "qfw", "coll": "site" } }, "jkmail": { "to": "zhengkun@topnet.net.cn,zhangjinkun@topnet.net.cn", "api": "http://172.17.145.179:19281/_send/_mail" }, "nextNode": [ { "addr": "127.0.0.1", "port": 1783, "stype": "bidding", "memo": "创建招标数据索引new" } ], "threads": 1, "isMerger": false, "lowHeavy":true, "timingTask":false, "timingSpanDay": 5, "timingPubScope": 720, "specialwords": "(重招|重新招标|勘察|施工|监理|总承包|土石方|可研)", "specialtitle_0": "(包|标段|标包)[((]?[0-9a-zA-Z一二三四五六七八九十零123456789][))]?", "specialtitle_1": "[0-9a-zA-Z一二三四五六七八九十零123456789](次|包|标段|标包|批|期)", "specialtitle_2": "项目[((][0-9a-zA-Z一二三四五六七八九十零123456789][))]", "beifen": "[((]?[0-9一二三四五六七八九十零123456789再][))]?[子分]?[次批标包]|重招|重新招标|勘察|设计|施工|监理|总承包|土石方|可研" } func historyTaskDay() { defer util.Catch() for { start:=time.Now().Unix() if gtid=="" { log.Println("请传gtid,否则无法运行") os.Exit(0) return } if lteid!="" { //先进行数据迁移 log.Println("开启一次迁移任务",gtid,lteid) moveHistoryData(gtid,lteid) gtid = lteid //替换数据 } //查询表最后一个id task_sess := task_mgo.GetMgoConn() defer task_mgo.DestoryMongoConn(task_sess) q:=map[string]interface{}{ "isused":true, } between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期 it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter() for tmp := make(map[string]interface{}); it_last.Next(&tmp); { lteid = util.ObjToString(tmp["gtid"]) log.Println("查询的最后一个任务Id:",lteid) break } log.Println("查询完毕-先睡眠5分钟",gtid,lteid) time.Sleep(5 * time.Minute) sess := mgo.GetMgoConn()//连接器 defer mgo.DestoryMongoConn(sess) //开始判重 q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(gtid), "$lte": StringTOBsonId(lteid), }, } log.Println("历史判重查询条件:",q,"时间:", between_time) it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数 updateExtract := [][]map[string]interface{}{}//批量更新mongo数组 pendAllArr:=[][]map[string]interface{}{}//待处理数组 dayArr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(&tmp); num++ { if num%10000 == 0 { log.Println("正序遍历:", num) } source := util.ObjToMap(tmp["jsondata"]) if util.IntAll((*source)["sourcewebsite"]) == 1 { outnum++ updatelock.Lock() updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "dataging": 0, "repeat_reason": "sourcewebsite为1 重复", }, }, }) if len(updateExtract) >= 200 { log.Println("sourcewebsite,批量更新") mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } updatelock.Unlock() tmp = make(map[string]interface{}) continue } //取-符合-发布时间X年内的数据 updatelock.Lock() if util.IntAll(tmp["dataging"]) == 1 { pubtime := util.Int64All(tmp["publishtime"]) if pubtime > 0 && pubtime >= between_time { oknum++ if deterTime==0 { log.Println("找到第一条符合条件的数据") deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { if pubtime-deterTime >timingSpanDay*86400 { //新数组重新构建,当前组数据加到全部组数据 pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { dayArr = append(dayArr,tmp) } } }else { outnum++ //不在两年内的也清标记 updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, }, }, }) if len(updateExtract) >= 200 { log.Println("不在周期内符合dataging==1,批量更新") mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } } } updatelock.Unlock() tmp = make(map[string]interface{}) } //批量更新标记 updatelock.Lock() if len(updateExtract) > 0 { log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum) mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } updatelock.Unlock() if len(dayArr)>0 { pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} } log.Println("查询数量:",num,"符合条件:",oknum) if len(pendAllArr) <= 0 { log.Println("没找到dataging==1的数据") } //测试分组数量是否正确 testNum:=0 for k,v:=range pendAllArr { log.Println("第",k,"组--","数量:",len(v)) testNum = testNum+len(v) } log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum) n, repeateN := 0, 0 log.Println("线程数:",threadNum) pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} for k,v:=range pendAllArr { //每组结束更新一波数据 pool <- true wg.Add(1) go func(k int, v []map[string]interface{}) { defer func() { <-pool wg.Done() }() //每组临时数组 - 互不干扰 groupUpdateExtract := [][]map[string]interface{}{} // groupOtherExtract := [][]map[string]interface{}{} //构建当前组的数据池 log.Println("构建第", k, "组---(数据池)") //当前组的第一个发布时间 first_pt := util.Int64All(v[len(v)-1]["publishtime"]) curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k)) log.Println("开始遍历判重第", k, "组 共计数量:", len(v)) n = n + len(v) log.Println("统计目前总数量:", n, "重复数量:", repeateN) for _, tmp := range v { info := NewInfo(tmp) b, source, reason := curTM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 repeateN++ //重复数据打标签 repeat_ids:=source.repeat_ids repeat_ids = append(repeat_ids,info.id) source.repeat_ids = repeat_ids //替换数据池-更新 DM.replacePoolData(source) updatelock.Lock() //更新数据源- 14 或者 15 //判断是否在当前段落 if judgeIsCurIds(gtid,lteid,source.id) { groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": StringTOBsonId(source.id), }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_ids": repeat_ids, }, }, }) }else { groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": StringTOBsonId(source.id), }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_ids": repeat_ids, }, }, }) } groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, }, }, }) if len(groupUpdateExtract) >= 500 { mgo.UpSertBulk(extract, groupUpdateExtract...) groupUpdateExtract = [][]map[string]interface{}{} } if len(groupOtherExtract) >= 500 { mgo.UpSertBulk(extract_back, groupOtherExtract...) groupOtherExtract = [][]map[string]interface{}{} } updatelock.Unlock() } else { updatelock.Lock() groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, //符合条件的都为dataging==0 }, }, }) if len(groupUpdateExtract) >= 500 { mgo.UpSertBulk(extract, groupUpdateExtract...) groupUpdateExtract = [][]map[string]interface{}{} } updatelock.Unlock() } } //每组数据结束-更新数据 updatelock.Lock() if len(groupUpdateExtract) > 0 { mgo.UpSertBulk(extract, groupUpdateExtract...) } if len(groupOtherExtract) > 0 { mgo.UpSertBulk(extract_back, groupOtherExtract...) } updatelock.Unlock() }(k, v) } wg.Wait() //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并 if n >= repeateN && gtid!=lteid{ for _, to := range nextNode { next_sid := util.BsonIdToSId(gtid) next_eid := util.BsonIdToSId(lteid) key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": next_sid, "lteid": next_eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } end:=time.Now().Unix() log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid) log.Println(gtid,lteid) if end-start<60*5 { log.Println("睡眠.............") time.Sleep(5 * time.Minute) } log.Println("继续下一段的历史判重") } }func historyTaskDay() { defer util.Catch() for { start:=time.Now().Unix() if gtid=="" { log.Println("请传gtid,否则无法运行") os.Exit(0) return } if lteid!="" { //先进行数据迁移 log.Println("开启一次迁移任务",gtid,lteid) moveHistoryData(gtid,lteid) gtid = lteid //替换数据 } //查询表最后一个id task_sess := task_mgo.GetMgoConn() defer task_mgo.DestoryMongoConn(task_sess) q:=map[string]interface{}{ "isused":true, } between_time := time.Now().Unix() - (86400 * timingPubScope)//两年周期 it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter() for tmp := make(map[string]interface{}); it_last.Next(&tmp); { lteid = util.ObjToString(tmp["gtid"]) log.Println("查询的最后一个任务Id:",lteid) break } log.Println("查询完毕-先睡眠5分钟",gtid,lteid) time.Sleep(5 * time.Minute) sess := mgo.GetMgoConn()//连接器 defer mgo.DestoryMongoConn(sess) //开始判重 q = map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(gtid), "$lte": StringTOBsonId(lteid), }, } log.Println("历史判重查询条件:",q,"时间:", between_time) it := sess.DB(mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() num,oknum,outnum, deterTime:= int64(0),int64(0),int64(0),int64(0) //计数 updateExtract := [][]map[string]interface{}{}//批量更新mongo数组 pendAllArr:=[][]map[string]interface{}{}//待处理数组 dayArr := []map[string]interface{}{} for tmp := make(map[string]interface{}); it.Next(&tmp); num++ { if num%10000 == 0 { log.Println("正序遍历:", num) } source := util.ObjToMap(tmp["jsondata"]) if util.IntAll((*source)["sourcewebsite"]) == 1 { outnum++ updatelock.Lock() updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "dataging": 0, "repeat_reason": "sourcewebsite为1 重复", }, }, }) if len(updateExtract) >= 200 { log.Println("sourcewebsite,批量更新") mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } updatelock.Unlock() tmp = make(map[string]interface{}) continue } //取-符合-发布时间X年内的数据 updatelock.Lock() if util.IntAll(tmp["dataging"]) == 1 { pubtime := util.Int64All(tmp["publishtime"]) if pubtime > 0 && pubtime >= between_time { oknum++ if deterTime==0 { log.Println("找到第一条符合条件的数据") deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { if pubtime-deterTime >timingSpanDay*86400 { //新数组重新构建,当前组数据加到全部组数据 pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} deterTime = util.Int64All(tmp["publishtime"]) dayArr = append(dayArr,tmp) }else { dayArr = append(dayArr,tmp) } } }else { outnum++ //不在两年内的也清标记 updateExtract = append(updateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, }, }, }) if len(updateExtract) >= 200 { log.Println("不在周期内符合dataging==1,批量更新") mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } } } updatelock.Unlock() tmp = make(map[string]interface{}) } //批量更新标记 updatelock.Lock() if len(updateExtract) > 0 { log.Println("分组后,最后更新不进行判重的数据:",len(updateExtract),oknum+outnum) mgo.UpSertBulk(extract, updateExtract...) updateExtract = [][]map[string]interface{}{} } updatelock.Unlock() if len(dayArr)>0 { pendAllArr = append(pendAllArr,dayArr) dayArr = []map[string]interface{}{} } log.Println("查询数量:",num,"符合条件:",oknum) if len(pendAllArr) <= 0 { log.Println("没找到dataging==1的数据") } //测试分组数量是否正确 testNum:=0 for k,v:=range pendAllArr { log.Println("第",k,"组--","数量:",len(v)) testNum = testNum+len(v) } log.Println("本地构建分组完成:",len(pendAllArr),"组","测试-总计数量:",testNum) n, repeateN := 0, 0 log.Println("线程数:",threadNum) pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} for k,v:=range pendAllArr { //每组结束更新一波数据 pool <- true wg.Add(1) go func(k int, v []map[string]interface{}) { defer func() { <-pool wg.Done() }() //每组临时数组 - 互不干扰 groupUpdateExtract := [][]map[string]interface{}{} // groupOtherExtract := [][]map[string]interface{}{} //构建当前组的数据池 log.Println("构建第", k, "组---(数据池)") //当前组的第一个发布时间 first_pt := util.Int64All(v[len(v)-1]["publishtime"]) curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k)) log.Println("开始遍历判重第", k, "组 共计数量:", len(v)) n = n + len(v) log.Println("统计目前总数量:", n, "重复数量:", repeateN) for _, tmp := range v { info := NewInfo(tmp) b, source, reason := curTM.check(info) if b { //有重复,生成更新语句,更新抽取和更新招标 repeateN++ //重复数据打标签 repeat_ids:=source.repeat_ids repeat_ids = append(repeat_ids,info.id) source.repeat_ids = repeat_ids //替换数据池-更新 DM.replacePoolData(source) updatelock.Lock() //更新数据源- 14 或者 15 //判断是否在当前段落 if judgeIsCurIds(gtid,lteid,source.id) { groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": StringTOBsonId(source.id), }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_ids": repeat_ids, }, }, }) }else { groupOtherExtract = append(groupOtherExtract, []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": StringTOBsonId(source.id), }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_ids": repeat_ids, }, }, }) } groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, }, }, }) if len(groupUpdateExtract) >= 500 { mgo.UpSertBulk(extract, groupUpdateExtract...) groupUpdateExtract = [][]map[string]interface{}{} } if len(groupOtherExtract) >= 500 { mgo.UpSertBulk(extract_back, groupOtherExtract...) groupOtherExtract = [][]map[string]interface{}{} } updatelock.Unlock() } else { updatelock.Lock() groupUpdateExtract = append(groupUpdateExtract, []map[string]interface{}{ map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "dataging": 0, //符合条件的都为dataging==0 }, }, }) if len(groupUpdateExtract) >= 500 { mgo.UpSertBulk(extract, groupUpdateExtract...) groupUpdateExtract = [][]map[string]interface{}{} } updatelock.Unlock() } } //每组数据结束-更新数据 updatelock.Lock() if len(groupUpdateExtract) > 0 { mgo.UpSertBulk(extract, groupUpdateExtract...) } if len(groupOtherExtract) > 0 { mgo.UpSertBulk(extract_back, groupOtherExtract...) } updatelock.Unlock() }(k, v) } wg.Wait() //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并 if n >= repeateN && gtid!=lteid{ for _, to := range nextNode { next_sid := util.BsonIdToSId(gtid) next_eid := util.BsonIdToSId(lteid) key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": next_sid, "lteid": next_eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } end:=time.Now().Unix() log.Println("this timeTask over.", n, "repeateN:", repeateN,gtid,lteid) log.Println(gtid,lteid) if end-start<60*5 { log.Println("睡眠.............") time.Sleep(5 * time.Minute) } log.Println("继续下一段的历史判重") } }