123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194 |
- package main
- import (
- "encoding/json"
- "log"
- mu "mfw/util"
- "net"
- "qfw/common/src/qfw/util"
- qu "qfw/util"
- "sync"
- "time"
- )
- //开始增量判重程序
- func increaseRepeat(mapInfo map[string]interface{}) {
- defer qu.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println("~~~~~~")
- log.Println("开始增量数据判重~查询条件:",data_mgo.DbName, extract, q)
- sess := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
- total, isok ,repeatN:= 0,0,0
- dataAllDict := make(map[string][]map[string]interface{},0)
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%1000 == 0 {
- log.Println("current index : ", total, isok)
- }
- if util.IntAll(tmp["repeat"]) == 1 {
- repeatN++
- tmp = make(map[string]interface{})
- continue
- }
- if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
- tmp = make(map[string]interface{})
- continue
- }
- //数据分组-按照类别分组
- isok++
- subtype := qu.ObjToString(tmp["subtype"])
- if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
- subtype=="竞谈"||subtype=="竞价" {
- subtype = "招标"
- }
- dataArr := dataAllDict[subtype]
- if dataArr==nil {
- dataArr = []map[string]interface{}{}
- }
- dataArr = append(dataArr,tmp)
- dataAllDict[subtype] = dataArr
- tmp = make(map[string]interface{})
- }
- log.Println("类别组:",len(dataAllDict),"组","~","总计:",total,"~","需判重:",isok)
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- for _,dataArr := range dataAllDict {
- log.Println("处理中...","当前重复量~", repeatN)
- pool <- true
- wg.Add(1)
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- num := 0
- for _,tmp := range dataArr{
- info := NewInfo(tmp)
- b,source,reason := DM.check(info)
- if b {
- //判断信息是否为-指定剑鱼发布数据
- if jyfb_data[info.spidercode]!="" { //伪判重标记
- Update.updatePool <- []map[string]interface{}{//原始数据打标签
- map[string]interface{}{
- "_id": StringTOBsonId(info.id),
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat_jyfb": 1,
- },
- },
- }
- } else { //真实重复~~~
- num++
- var updateID = map[string]interface{}{} //记录更新判重的
- updateID["_id"] = StringTOBsonId(info.id)
- repeat_ids:=source.repeat_ids
- repeat_ids = append(repeat_ids,info.id)
- source.repeat_ids = repeat_ids
- DM.replacePoolData(source)//替换数据池-更新
- Update.updatePool <- []map[string]interface{}{//原始数据打标签
- map[string]interface{}{
- "_id": StringTOBsonId(source.id),
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat_ids": repeat_ids,
- },
- },
- }
- Update.updatePool <- []map[string]interface{}{//重复数据打标签
- updateID,
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 1,
- "repeat_reason": reason,
- "repeat_id": source.id,
- "dataging": 0,
- "updatetime_repeat" :util.Int64All(time.Now().Unix()),
- },
- },
- }
- }
- }
- }
- numberlock.Lock()
- repeatN+=num
- numberlock.Unlock()
- }(dataArr)
- }
- wg.Wait()
- log.Println("this cur task over.", total, "repeateN:", repeatN)
- //更新Ocr的标记
- updateOcrFileData(mapInfo["lteid"].(string))
- time.Sleep(15 * time.Second)
- //任务完成,开始发送广播通知下面节点
- log.Println("判重任务完成发送udp")
- for _, to := range nextNode {
- sid, _ := mapInfo["gtid"].(string)
- eid, _ := mapInfo["lteid"].(string)
- key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": util.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: util.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- //更新ocr表
- func updateOcrFileData(cur_lteid string) {
- //更新ocr 分类表-判重的状态
- log.Println("开始更新Ocr表-标记",cur_lteid)
- task_sess := task_mgo.GetMgoConn()
- defer task_mgo.DestoryMongoConn(task_sess)
- q_task:=map[string]interface{}{}
- it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
- isUpdateOcr:=false
- updateOcrFile:=[][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
- cur_id := BsonTOStringId(tmp["_id"])
- lteid:=util.ObjToString(tmp["lteid"])
- if (lteid==cur_lteid) { //需要更新
- log.Println("找到该lteid数据",cur_lteid,cur_id)
- isUpdateOcr = true
- updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "is_repeat_status": 1,
- "is_repeat_time" : util.Int64All(time.Now().Unix()),
- },
- },
- })
- tmp = make(map[string]interface{})
- break
- }else {
- tmp = make(map[string]interface{})
- }
- }
- if !isUpdateOcr {
- log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
- }else {
- if len(updateOcrFile) > 0 {
- task_mgo.UpSertBulk(task_collName, updateOcrFile...)
- }
- }
- }
|