package main import qu "jygit.jydev.jianyu360.cn/data_processing/common_utils" func increaseFlowRepeat(msgInfo *MsgInfo) { msg_info := msgInfo.Data tmp := *qu.ObjToMap(msg_info["ext"]) if tmp != nil { //特殊类不判重 if qu.IntAll(tmp["repeat"]) == 1 || qu.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" || (qu.IntAll(tmp["dataging"]) == 1 && !IsFull) || (qu.ObjToString(tmp["subtype"]) == "拟建" || qu.ObjToString(tmp["subtype"]) == "产权") { } else { info := NewInfo(tmp) b, source, reason := DM.check(info) if b { //判断信息是否为-指定剑鱼发布数据 if jyfb_data[info.spidercode] != "" { //伪判重标记 tmp["repeat_jyfb"] = 1 msg_info["ext"] = tmp msgInfo.Data = msg_info } else { //判断是否为~替换数据~模式 if judgeIsReplaceInfo(source.href, info.href) && !IsFull { datalock.Lock() temp_source_id := source.id temp_info_id := info.id temp_source := info temp_source.id = temp_source_id DM.replacePoolData(temp_source) //替换数据池数据 //替换抽取表数据 is_log, ext_s_data, ext_i_data := confrimExtractData(temp_source_id, temp_info_id) if is_log { data_mgo.Save(extract_log, map[string]interface{}{ "_id": tmp["_id"], "replace_id": temp_source_id, "is_history": 0, }) ext_s_data["repeat"] = 0 ext_i_data["repeat"] = 1 ext_i_data["repeat_id"] = temp_source_id ext_i_data["repeat_reason"] = reason data_mgo.DeleteById(extract, temp_source_id) data_mgo.Save(extract, ext_s_data) is_del := data_mgo.DeleteById(extract_back, temp_source_id) if is_del > 0 { data_mgo.Save(extract_back, ext_s_data) } data_mgo.DeleteById(extract, temp_info_id) data_mgo.Save(extract, ext_i_data) //替换数据特殊消息... msgInfo.Extend = Extend{ Repeat: MsgRepeat{ SId: temp_source_id, RId: temp_info_id, }, } } datalock.Unlock() } //写入具体数据...返回消息... tmp["repeat"] = 1 tmp["repeat_reason"] = reason tmp["repeat_id"] = source.id msg_info["ext"] = tmp msgInfo.Data = msg_info } } } } }