package main import ( "encoding/json" "fmt" qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp" "log" "net" "sync" "time" ) // 开始增量判重程序 func increaseRepeat(mapInfo map[string]interface{}) { defer qu.Catch() //区间id q := map[string]interface{}{ "_id": map[string]interface{}{ "$gt": StringTOBsonId(mapInfo["gtid"].(string)), "$lte": StringTOBsonId(mapInfo["lteid"].(string)), }, } log.Println("~~~~~~") log.Println("开始增量数据判重~查询条件:", data_mgo.DbName, extract, q) sess := data_mgo.GetMgoConn() defer data_mgo.DestoryMongoConn(sess) it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter() total, isok, repeatN := 0, 0, 0 dataAllDict := make(map[string][]map[string]interface{}, 0) for tmp := make(map[string]interface{}); it.Next(&tmp); total++ { if total%1000 == 0 { log.Println("current index : ", total, isok) } if qu.IntAll(tmp["repeat"]) == 1 { repeatN++ tmp = make(map[string]interface{}) continue } if qu.IntAll(tmp["dataging"]) == 1 && !IsFull { tmp = make(map[string]interface{}) continue } if qu.ObjToString(tmp["subtype"]) == "拟建" || qu.ObjToString(tmp["subtype"]) == "产权" { tmp = make(map[string]interface{}) continue } if qu.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" { tmp = make(map[string]interface{}) continue } //数据分组-按照类别分组 isok++ subtype := qu.ObjToString(tmp["subtype"]) if subtype == "招标" || subtype == "邀标" || subtype == "询价" || subtype == "竞谈" || subtype == "竞价" { subtype = "招标" } dataArr := dataAllDict[subtype] if dataArr == nil { dataArr = []map[string]interface{}{} } dataArr = append(dataArr, tmp) dataAllDict[subtype] = dataArr tmp = make(map[string]interface{}) } log.Println("类别组:", len(dataAllDict), "组", "~", "总计:", total, "~", "需判重:", isok) pool := make(chan bool, threadNum) wg := &sync.WaitGroup{} for _, dataArr := range dataAllDict { fmt.Print("...") pool <- true wg.Add(1) go func(dataArr []map[string]interface{}) { defer func() { <-pool wg.Done() }() num := 0 for _, tmp := range dataArr { info := NewInfo(tmp) b, source, reason := DM.check(info) if b { //判断信息是否为-指定剑鱼发布数据 if jyfb_data[info.spidercode] != "" { //伪判重标记 Update.updatePool <- []map[string]interface{}{ //原始数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat_jyfb": 1, }, }, } } else { num++ //判断是否为~替换数据~模式 if judgeIsReplaceInfo(source.href, info.href) && !IsFull { datalock.Lock() temp_source_id := source.id temp_info_id := info.id temp_source := info temp_source.id = temp_source_id DM.replacePoolData(temp_source) //替换抽取表数据 is_log, ext_s_data, ext_i_data := confrimExtractData(temp_source_id, temp_info_id) is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id) if is_log && is_bid { data_mgo.Save(extract_log, map[string]interface{}{ "_id": tmp["_id"], "replace_id": temp_source_id, "is_history": 0, }) ext_s_data["repeat"] = 0 ext_i_data["repeat"] = 1 ext_i_data["repeat_id"] = temp_source_id ext_i_data["repeat_reason"] = reason data_mgo.DeleteById(extract, temp_source_id) data_mgo.Save(extract, ext_s_data) is_del := data_mgo.DeleteById(extract_back, temp_source_id) if is_del > 0 { data_mgo.Save(extract_back, ext_s_data) } data_mgo.DeleteById(extract, temp_info_id) data_mgo.Save(extract, ext_i_data) task_mgo.DeleteById(task_bidding, temp_source_id) task_mgo.Save(task_bidding, bid_s_data) task_mgo.DeleteById(task_bidding, temp_info_id) task_mgo.Save(task_bidding, bid_i_data) //通道填充数据 msg := "id=" + temp_source_id _ = nspdata_1.Publish(msg) _ = nspdata_2.Publish(msg) } else { log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id) } datalock.Unlock() } else { //更新池~更新 Update.updatePool <- []map[string]interface{}{ //重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "repeat": 1, "repeat_reason": reason, "repeat_id": source.id, }, }, } } } } } numlock.Lock() repeatN += num numlock.Unlock() }(dataArr) } wg.Wait() log.Println("当前~判重~结束~", total, "重复~", repeatN) //更新流程记录表 updateProcessUdpIdsInfo(qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"])) time.Sleep(10 * time.Second) log.Println("判重任务完成...发送下节点udp...") for _, to := range nextNode { sid, _ := mapInfo["gtid"].(string) eid, _ := mapInfo["lteid"].(string) key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"]) by, _ := json.Marshal(map[string]interface{}{ "gtid": sid, "lteid": eid, "stype": qu.ObjToString(to["stype"]), "key": key, }) addr := &net.UDPAddr{ IP: net.ParseIP(to["addr"].(string)), Port: qu.IntAll(to["port"]), } node := &udpNode{by, addr, time.Now().Unix(), 0} udptaskmap.Store(key, node) udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr) } } // 更新流程记录id段落 func updateProcessUdpIdsInfo(sid string, eid string) { //判重有合并操作~所以要联合查询 query := map[string]interface{}{ "gtid": map[string]interface{}{ "$gte": sid, }, "lteid": map[string]interface{}{ "$lte": eid, }, } datas, _ := task_mgo.Find(task_coll, query, nil, nil) if len(datas) > 0 { log.Println("开始更新流程段落记录~~", len(datas), "段") for _, v := range datas { up_id := BsonTOStringId(v["_id"]) if up_id != "" { update := map[string]interface{}{ "$set": map[string]interface{}{ "dataprocess": 6, "repeat_status": 1, "updatetime": time.Now().Unix(), }, } task_mgo.UpdateById(task_coll, up_id, update) log.Println("流程段落记录~~更新完毕~", update) } } } else { log.Println("未查询到记录id段落~", query) } } // 更新ocr表~弃用 func updateOcrFileData(cur_lteid string) { //更新ocr 分类表-判重的状态 log.Println("开始更新Ocr表-标记", cur_lteid) task_sess := task_mgo.GetMgoConn() defer task_mgo.DestoryMongoConn(task_sess) q_task := map[string]interface{}{} it_last := task_sess.DB(task_mgo.DbName).C(task_coll).Find(&q_task).Sort("-_id").Iter() isUpdateOcr := false updateOcrFile := [][]map[string]interface{}{} for tmp := make(map[string]interface{}); it_last.Next(&tmp); { cur_id := BsonTOStringId(tmp["_id"]) lte_id := qu.ObjToString(tmp["lteid"]) if lte_id == cur_lteid { //需要更新 log.Println("找到该lteid数据", cur_lteid, cur_id) isUpdateOcr = true updateOcrFile = append(updateOcrFile, []map[string]interface{}{ //重复数据打标签 map[string]interface{}{ "_id": tmp["_id"], }, map[string]interface{}{ "$set": map[string]interface{}{ "is_repeat_status": 1, "is_repeat_time": qu.Int64All(time.Now().Unix()), }, }, }) tmp = make(map[string]interface{}) break } else { tmp = make(map[string]interface{}) } } if !isUpdateOcr { log.Println("出现异常问题,查询不到ocr的lteid", cur_lteid) } else { if len(updateOcrFile) > 0 { task_mgo.UpSertBulk(task_coll, updateOcrFile...) } } }