flow_increaseRepeat.go 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package main
  2. import qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  3. func increaseFlowRepeat(msgInfo *MsgInfo) {
  4. msg_info := msgInfo.Data
  5. tmp := *qu.ObjToMap(msg_info["ext"])
  6. if tmp != nil {
  7. //特殊类不判重
  8. if qu.IntAll(tmp["repeat"]) == 1 || qu.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" ||
  9. (qu.IntAll(tmp["dataging"]) == 1 && !IsFull) ||
  10. (qu.ObjToString(tmp["subtype"]) == "拟建" || qu.ObjToString(tmp["subtype"]) == "产权") {
  11. } else {
  12. info := NewInfo(tmp)
  13. b, source, reason := DM.check(info)
  14. if b { //判断信息是否为-指定剑鱼发布数据
  15. if jyfb_data[info.spidercode] != "" { //伪判重标记
  16. tmp["repeat_jyfb"] = 1
  17. msg_info["ext"] = tmp
  18. msgInfo.Data = msg_info
  19. } else { //判断是否为~替换数据~模式
  20. if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
  21. datalock.Lock()
  22. temp_source_id := source.id
  23. temp_info_id := info.id
  24. temp_source := info
  25. temp_source.id = temp_source_id
  26. DM.replacePoolData(temp_source) //替换数据池数据
  27. //替换抽取表数据
  28. is_log, ext_s_data, ext_i_data := confrimExtractData(temp_source_id, temp_info_id)
  29. if is_log {
  30. data_mgo.Save(extract_log, map[string]interface{}{
  31. "_id": tmp["_id"],
  32. "replace_id": temp_source_id,
  33. "is_history": 0,
  34. })
  35. ext_s_data["repeat"] = 0
  36. ext_i_data["repeat"] = 1
  37. ext_i_data["repeat_id"] = temp_source_id
  38. ext_i_data["repeat_reason"] = reason
  39. data_mgo.DeleteById(extract, temp_source_id)
  40. data_mgo.Save(extract, ext_s_data)
  41. is_del := data_mgo.DeleteById(extract_back, temp_source_id)
  42. if is_del > 0 {
  43. data_mgo.Save(extract_back, ext_s_data)
  44. }
  45. data_mgo.DeleteById(extract, temp_info_id)
  46. data_mgo.Save(extract, ext_i_data)
  47. //替换数据特殊消息...
  48. msgInfo.Extend = Extend{
  49. Repeat: MsgRepeat{
  50. SId: temp_source_id,
  51. RId: temp_info_id,
  52. },
  53. }
  54. }
  55. datalock.Unlock()
  56. }
  57. //写入具体数据...返回消息...
  58. tmp["repeat"] = 1
  59. tmp["repeat_reason"] = reason
  60. tmp["repeat_id"] = source.id
  61. msg_info["ext"] = tmp
  62. msgInfo.Data = msg_info
  63. }
  64. }
  65. }
  66. }
  67. }