123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- package main
- import (
- "encoding/json"
- "fmt"
- qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
- "log"
- "net"
- "sync"
- "time"
- )
- // 开始增量判重程序
- func increaseRepeat(mapInfo map[string]interface{}) {
- defer qu.Catch()
- //区间id
- q := map[string]interface{}{
- "_id": map[string]interface{}{
- "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
- "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
- },
- }
- log.Println("~~~~~~")
- log.Println("开始增量数据判重~查询条件:", data_mgo.DbName, extract, q)
- sess := data_mgo.GetMgoConn()
- defer data_mgo.DestoryMongoConn(sess)
- it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
- total, isok, repeatN := 0, 0, 0
- dataAllDict := make(map[string][]map[string]interface{}, 0)
- for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
- if total%1000 == 0 {
- log.Println("current index : ", total, isok)
- }
- if qu.IntAll(tmp["repeat"]) == 1 {
- repeatN++
- tmp = make(map[string]interface{})
- continue
- }
- if qu.IntAll(tmp["dataging"]) == 1 && !IsFull {
- tmp = make(map[string]interface{})
- continue
- }
- if qu.ObjToString(tmp["subtype"]) == "拟建" || qu.ObjToString(tmp["subtype"]) == "产权" {
- tmp = make(map[string]interface{})
- continue
- }
- if qu.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" {
- tmp = make(map[string]interface{})
- continue
- }
- //数据分组-按照类别分组
- isok++
- subtype := qu.ObjToString(tmp["subtype"])
- if subtype == "招标" || subtype == "邀标" || subtype == "询价" ||
- subtype == "竞谈" || subtype == "竞价" {
- subtype = "招标"
- }
- dataArr := dataAllDict[subtype]
- if dataArr == nil {
- dataArr = []map[string]interface{}{}
- }
- dataArr = append(dataArr, tmp)
- dataAllDict[subtype] = dataArr
- tmp = make(map[string]interface{})
- }
- log.Println("类别组:", len(dataAllDict), "组", "~", "总计:", total, "~", "需判重:", isok)
- pool := make(chan bool, threadNum)
- wg := &sync.WaitGroup{}
- for _, dataArr := range dataAllDict {
- fmt.Print("...")
- pool <- true
- wg.Add(1)
- go func(dataArr []map[string]interface{}) {
- defer func() {
- <-pool
- wg.Done()
- }()
- num := 0
- for _, tmp := range dataArr {
- info := NewInfo(tmp)
- b, source, reason := DM.check(info)
- if b {
- //判断信息是否为-指定剑鱼发布数据
- if jyfb_data[info.spidercode] != "" { //伪判重标记
- Update.updatePool <- []map[string]interface{}{ //原始数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat_jyfb": 1,
- },
- },
- }
- } else {
- num++
- //判断是否为~替换数据~模式
- if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
- datalock.Lock()
- temp_source_id := source.id
- temp_info_id := info.id
- temp_source := info
- temp_source.id = temp_source_id
- DM.replacePoolData(temp_source)
- //替换抽取表数据
- is_log, ext_s_data, ext_i_data := confrimExtractData(temp_source_id, temp_info_id)
- is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
- if is_log && is_bid {
- data_mgo.Save(extract_log, map[string]interface{}{
- "_id": tmp["_id"],
- "replace_id": temp_source_id,
- "is_history": 0,
- })
- ext_s_data["repeat"] = 0
- ext_i_data["repeat"] = 1
- ext_i_data["repeat_id"] = temp_source_id
- ext_i_data["repeat_reason"] = reason
- data_mgo.DeleteById(extract, temp_source_id)
- data_mgo.Save(extract, ext_s_data)
- is_del := data_mgo.DeleteById(extract_back, temp_source_id)
- if is_del > 0 {
- data_mgo.Save(extract_back, ext_s_data)
- }
- data_mgo.DeleteById(extract, temp_info_id)
- data_mgo.Save(extract, ext_i_data)
- task_mgo.DeleteById(task_bidding, temp_source_id)
- task_mgo.Save(task_bidding, bid_s_data)
- task_mgo.DeleteById(task_bidding, temp_info_id)
- task_mgo.Save(task_bidding, bid_i_data)
- //通道填充数据
- msg := "id=" + temp_source_id
- _ = nspdata_1.Publish(msg)
- _ = nspdata_2.Publish(msg)
- } else {
- log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
- }
- datalock.Unlock()
- } else {
- //更新池~更新
- Update.updatePool <- []map[string]interface{}{ //重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "repeat": 1,
- "repeat_reason": reason,
- "repeat_id": source.id,
- },
- },
- }
- }
- }
- }
- }
- numlock.Lock()
- repeatN += num
- numlock.Unlock()
- }(dataArr)
- }
- wg.Wait()
- log.Println("当前~判重~结束~", total, "重复~", repeatN)
- //更新流程记录表
- updateProcessUdpIdsInfo(qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"]))
- time.Sleep(10 * time.Second)
- log.Println("判重任务完成...发送下节点udp...")
- for _, to := range nextNode {
- sid, _ := mapInfo["gtid"].(string)
- eid, _ := mapInfo["lteid"].(string)
- key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"])
- by, _ := json.Marshal(map[string]interface{}{
- "gtid": sid,
- "lteid": eid,
- "stype": qu.ObjToString(to["stype"]),
- "key": key,
- })
- addr := &net.UDPAddr{
- IP: net.ParseIP(to["addr"].(string)),
- Port: qu.IntAll(to["port"]),
- }
- node := &udpNode{by, addr, time.Now().Unix(), 0}
- udptaskmap.Store(key, node)
- udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
- }
- }
- // 更新流程记录id段落
- func updateProcessUdpIdsInfo(sid string, eid string) {
- //判重有合并操作~所以要联合查询
- query := map[string]interface{}{
- "gtid": map[string]interface{}{
- "$gte": sid,
- },
- "lteid": map[string]interface{}{
- "$lte": eid,
- },
- }
- datas, _ := task_mgo.Find(task_coll, query, nil, nil)
- if len(datas) > 0 {
- log.Println("开始更新流程段落记录~~", len(datas), "段")
- for _, v := range datas {
- up_id := BsonTOStringId(v["_id"])
- if up_id != "" {
- update := map[string]interface{}{
- "$set": map[string]interface{}{
- "dataprocess": 6,
- "repeat_status": 1,
- "updatetime": time.Now().Unix(),
- },
- }
- task_mgo.UpdateById(task_coll, up_id, update)
- log.Println("流程段落记录~~更新完毕~", update)
- }
- }
- } else {
- log.Println("未查询到记录id段落~", query)
- }
- }
- // 更新ocr表~弃用
- func updateOcrFileData(cur_lteid string) {
- //更新ocr 分类表-判重的状态
- log.Println("开始更新Ocr表-标记", cur_lteid)
- task_sess := task_mgo.GetMgoConn()
- defer task_mgo.DestoryMongoConn(task_sess)
- q_task := map[string]interface{}{}
- it_last := task_sess.DB(task_mgo.DbName).C(task_coll).Find(&q_task).Sort("-_id").Iter()
- isUpdateOcr := false
- updateOcrFile := [][]map[string]interface{}{}
- for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
- cur_id := BsonTOStringId(tmp["_id"])
- lte_id := qu.ObjToString(tmp["lteid"])
- if lte_id == cur_lteid { //需要更新
- log.Println("找到该lteid数据", cur_lteid, cur_id)
- isUpdateOcr = true
- updateOcrFile = append(updateOcrFile, []map[string]interface{}{ //重复数据打标签
- map[string]interface{}{
- "_id": tmp["_id"],
- },
- map[string]interface{}{
- "$set": map[string]interface{}{
- "is_repeat_status": 1,
- "is_repeat_time": qu.Int64All(time.Now().Unix()),
- },
- },
- })
- tmp = make(map[string]interface{})
- break
- } else {
- tmp = make(map[string]interface{})
- }
- }
- if !isUpdateOcr {
- log.Println("出现异常问题,查询不到ocr的lteid", cur_lteid)
- } else {
- if len(updateOcrFile) > 0 {
- task_mgo.UpSertBulk(task_coll, updateOcrFile...)
- }
- }
- }
|