package main import ( "encoding/json" "log" mu "mfw/util" "net" "qfw/common/src/qfw/util" qu "qfw/util" "sync" "time" ) //开始增量判重程序 func increaseRepeat(mapInfo map[string]interface{}) { defer qu.Catch() //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } log.Println("~~~~~~") log.Println("开始增量数据判重~查询条件:",data_mgo.DbName, extract, q) sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() total, isok ,repeatN:= 0,0,0 dataAllDict := make(map[string][]map[string]interface{},0) for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%1000 == 0 { log.Println("current index : ", total, isok) } if util.IntAll(tmp["repeat"]) == 1 { repeatN++ tmp = make(map[string]interface{}) continue } if util.IntAll(tmp["dataging"]) == 1 && !IsFull{ tmp = make(map[string]interface{}) continue } //数据分组-按照类别分组 isok++ subtype := qu.ObjToString(tmp["subtype"]) if subtype=="招标"||subtype=="邀标"||subtype=="询价"|| subtype=="竞谈"||subtype=="竞价" { subtype = "招标" } dataArr := dataAllDict[subtype] if dataArr==nil { dataArr = []map[string]interface{}{} } dataArr = append(dataArr,tmp) dataAllDict[subtype] = dataArr tmp = make(map[string]interface{}) } log.Println("类别组:",len(dataAllDict),"组","~","总计:",total,"~","需判重:",isok) pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} for _,dataArr := range dataAllDict { log.Println("处理中...","当前重复量~", repeatN) pool <- true wg.Add(1) go func(dataArr []map[string]interface{}) { defer func() { <-pool wg.Done() }() num := 0 for _,tmp := range dataArr{ info := NewInfo(tmp) b,source,reason := DM.check(info) if b { //判断信息是否为-指定剑鱼发布数据 if jyfb_data[info.spidercode]!="" { //伪判重标记 Update.updatePool <- []map[string]interface{}{//原始数据打标签 map[string]interface{}{ "_id": StringTOBsonId(info.id), }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_jyfb": 1, }, }, } } else { //真实重复~~~ num++ var updateID = map[string]interface{}{} //记录更新判重的 updateID["_id"] = StringTOBsonId(info.id) repeat_ids:=source.repeat_ids repeat_ids = append(repeat_ids,info.id) source.repeat_ids = repeat_ids DM.replacePoolData(source)//替换数据池-更新 Update.updatePool <- []map[string]interface{}{//原始数据打标签 map[string]interface{}{ "_id": StringTOBsonId(source.id), }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_ids": repeat_ids, }, }, } Update.updatePool <- []map[string]interface{}{//重复数据打标签 updateID, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, "dataging": 0, "updatetime_repeat" :util.Int64All(time.Now().Unix()), }, }, } } } } numberlock.Lock() repeatN+=num numberlock.Unlock() }(dataArr) } wg.Wait() log.Println("this cur task over.", total, "repeateN:", repeatN) //更新Ocr的标记 updateOcrFileData(mapInfo["lteid"].(string)) time.Sleep(15 * time.Second) //任务完成,开始发送广播通知下面节点 log.Println("判重任务完成发送udp") for _, to := range nextNode { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + util.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": util.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: util.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } //更新ocr表 func updateOcrFileData(cur_lteid string) { //更新ocr 分类表-判重的状态 log.Println("开始更新Ocr表-标记",cur_lteid) task_sess := task_mgo.GetMgoConn() defer task_mgo.DestoryMongoConn(task_sess) q_task:=map[string]interface{}{} it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter() isUpdateOcr:=false updateOcrFile:=[][]map[string]interface{}{} for tmp := make(map[string]interface{}); it_last.Next(&tmp); { cur_id := BsonTOStringId(tmp["_id"]) lteid:=util.ObjToString(tmp["lteid"]) if (lteid==cur_lteid) { //需要更新 log.Println("找到该lteid数据",cur_lteid,cur_id) isUpdateOcr = true updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "is_repeat_status": 1, "is_repeat_time" : util.Int64All(time.Now().Unix()), }, }, }) tmp = make(map[string]interface{}) break }else { tmp = make(map[string]interface{}) } } if !isUpdateOcr { log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid) }else { if len(updateOcrFile) > 0 { task_mgo.UpSertBulk(task_collName, updateOcrFile...) } } }