increaseRepeat.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  6. mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  7. "log"
  8. "net"
  9. "sync"
  10. "time"
  11. )
  12. // 开始增量判重程序
  13. func increaseRepeat(mapInfo map[string]interface{}) {
  14. defer qu.Catch()
  15. //区间id
  16. q := map[string]interface{}{
  17. "_id": map[string]interface{}{
  18. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  19. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  20. },
  21. }
  22. log.Println("~~~~~~")
  23. log.Println("开始增量数据判重~查询条件:", data_mgo.DbName, extract, q)
  24. sess := data_mgo.GetMgoConn()
  25. defer data_mgo.DestoryMongoConn(sess)
  26. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  27. total, isok, repeatN := 0, 0, 0
  28. dataAllDict := make(map[string][]map[string]interface{}, 0)
  29. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  30. if total%1000 == 0 {
  31. log.Println("current index : ", total, isok)
  32. }
  33. if qu.IntAll(tmp["repeat"]) == 1 {
  34. repeatN++
  35. tmp = make(map[string]interface{})
  36. continue
  37. }
  38. if qu.IntAll(tmp["dataging"]) == 1 && !IsFull {
  39. tmp = make(map[string]interface{})
  40. continue
  41. }
  42. if qu.ObjToString(tmp["subtype"]) == "拟建" || qu.ObjToString(tmp["subtype"]) == "产权" {
  43. tmp = make(map[string]interface{})
  44. continue
  45. }
  46. if qu.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" {
  47. tmp = make(map[string]interface{})
  48. continue
  49. }
  50. //数据分组-按照类别分组
  51. isok++
  52. subtype := qu.ObjToString(tmp["subtype"])
  53. if subtype == "招标" || subtype == "邀标" || subtype == "询价" ||
  54. subtype == "竞谈" || subtype == "竞价" {
  55. subtype = "招标"
  56. }
  57. dataArr := dataAllDict[subtype]
  58. if dataArr == nil {
  59. dataArr = []map[string]interface{}{}
  60. }
  61. dataArr = append(dataArr, tmp)
  62. dataAllDict[subtype] = dataArr
  63. tmp = make(map[string]interface{})
  64. }
  65. log.Println("类别组:", len(dataAllDict), "组", "~", "总计:", total, "~", "需判重:", isok)
  66. pool := make(chan bool, threadNum)
  67. wg := &sync.WaitGroup{}
  68. for _, dataArr := range dataAllDict {
  69. fmt.Print("...")
  70. pool <- true
  71. wg.Add(1)
  72. go func(dataArr []map[string]interface{}) {
  73. defer func() {
  74. <-pool
  75. wg.Done()
  76. }()
  77. num := 0
  78. for _, tmp := range dataArr {
  79. info := NewInfo(tmp)
  80. b, source, reason := DM.check(info)
  81. if b {
  82. //判断信息是否为-指定剑鱼发布数据
  83. if jyfb_data[info.spidercode] != "" { //伪判重标记
  84. Update.updatePool <- []map[string]interface{}{ //原始数据打标签
  85. map[string]interface{}{
  86. "_id": tmp["_id"],
  87. },
  88. map[string]interface{}{
  89. "$set": map[string]interface{}{
  90. "repeat_jyfb": 1,
  91. },
  92. },
  93. }
  94. } else {
  95. num++
  96. //判断是否为~替换数据~模式
  97. if judgeIsReplaceInfo(source.href, info.href) && !IsFull {
  98. datalock.Lock()
  99. temp_source_id := source.id
  100. temp_info_id := info.id
  101. temp_source := info
  102. temp_source.id = temp_source_id
  103. DM.replacePoolData(temp_source)
  104. //替换抽取表数据
  105. is_log, ext_s_data, ext_i_data := confrimExtractData(temp_source_id, temp_info_id)
  106. is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
  107. if is_log && is_bid {
  108. data_mgo.Save(extract_log, map[string]interface{}{
  109. "_id": tmp["_id"],
  110. "replace_id": temp_source_id,
  111. "is_history": 0,
  112. })
  113. ext_s_data["repeat"] = 0
  114. ext_i_data["repeat"] = 1
  115. ext_i_data["repeat_id"] = temp_source_id
  116. ext_i_data["repeat_reason"] = reason
  117. data_mgo.DeleteById(extract, temp_source_id)
  118. data_mgo.Save(extract, ext_s_data)
  119. is_del := data_mgo.DeleteById(extract_back, temp_source_id)
  120. if is_del > 0 {
  121. data_mgo.Save(extract_back, ext_s_data)
  122. }
  123. data_mgo.DeleteById(extract, temp_info_id)
  124. data_mgo.Save(extract, ext_i_data)
  125. task_mgo.DeleteById(task_bidding, temp_source_id)
  126. task_mgo.Save(task_bidding, bid_s_data)
  127. task_mgo.DeleteById(task_bidding, temp_info_id)
  128. task_mgo.Save(task_bidding, bid_i_data)
  129. //通道填充数据
  130. msg := "id=" + temp_source_id
  131. _ = nspdata_1.Publish(msg)
  132. _ = nspdata_2.Publish(msg)
  133. } else {
  134. log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
  135. }
  136. datalock.Unlock()
  137. } else {
  138. //更新池~更新
  139. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  140. map[string]interface{}{
  141. "_id": tmp["_id"],
  142. },
  143. map[string]interface{}{
  144. "$set": map[string]interface{}{
  145. "repeat": 1,
  146. "repeat_reason": reason,
  147. "repeat_id": source.id,
  148. },
  149. },
  150. }
  151. }
  152. }
  153. }
  154. }
  155. numlock.Lock()
  156. repeatN += num
  157. numlock.Unlock()
  158. }(dataArr)
  159. }
  160. wg.Wait()
  161. log.Println("当前~判重~结束~", total, "重复~", repeatN)
  162. //更新流程记录表
  163. updateProcessUdpIdsInfo(qu.ObjToString(mapInfo["gtid"]), qu.ObjToString(mapInfo["lteid"]))
  164. time.Sleep(10 * time.Second)
  165. log.Println("判重任务完成...发送下节点udp...")
  166. for _, to := range nextNode {
  167. sid, _ := mapInfo["gtid"].(string)
  168. eid, _ := mapInfo["lteid"].(string)
  169. key := sid + "-" + eid + "-" + qu.ObjToString(to["stype"])
  170. by, _ := json.Marshal(map[string]interface{}{
  171. "gtid": sid,
  172. "lteid": eid,
  173. "stype": qu.ObjToString(to["stype"]),
  174. "key": key,
  175. })
  176. addr := &net.UDPAddr{
  177. IP: net.ParseIP(to["addr"].(string)),
  178. Port: qu.IntAll(to["port"]),
  179. }
  180. node := &udpNode{by, addr, time.Now().Unix(), 0}
  181. udptaskmap.Store(key, node)
  182. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  183. }
  184. }
  185. // 更新流程记录id段落
  186. func updateProcessUdpIdsInfo(sid string, eid string) {
  187. //判重有合并操作~所以要联合查询
  188. query := map[string]interface{}{
  189. "gtid": map[string]interface{}{
  190. "$gte": sid,
  191. },
  192. "lteid": map[string]interface{}{
  193. "$lte": eid,
  194. },
  195. }
  196. datas, _ := task_mgo.Find(task_coll, query, nil, nil)
  197. if len(datas) > 0 {
  198. log.Println("开始更新流程段落记录~~", len(datas), "段")
  199. for _, v := range datas {
  200. up_id := BsonTOStringId(v["_id"])
  201. if up_id != "" {
  202. update := map[string]interface{}{
  203. "$set": map[string]interface{}{
  204. "dataprocess": 6,
  205. "repeat_status": 1,
  206. "updatetime": time.Now().Unix(),
  207. },
  208. }
  209. task_mgo.UpdateById(task_coll, up_id, update)
  210. log.Println("流程段落记录~~更新完毕~", update)
  211. }
  212. }
  213. } else {
  214. log.Println("未查询到记录id段落~", query)
  215. }
  216. }
  217. // 更新ocr表~弃用
  218. func updateOcrFileData(cur_lteid string) {
  219. //更新ocr 分类表-判重的状态
  220. log.Println("开始更新Ocr表-标记", cur_lteid)
  221. task_sess := task_mgo.GetMgoConn()
  222. defer task_mgo.DestoryMongoConn(task_sess)
  223. q_task := map[string]interface{}{}
  224. it_last := task_sess.DB(task_mgo.DbName).C(task_coll).Find(&q_task).Sort("-_id").Iter()
  225. isUpdateOcr := false
  226. updateOcrFile := [][]map[string]interface{}{}
  227. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  228. cur_id := BsonTOStringId(tmp["_id"])
  229. lte_id := qu.ObjToString(tmp["lteid"])
  230. if lte_id == cur_lteid { //需要更新
  231. log.Println("找到该lteid数据", cur_lteid, cur_id)
  232. isUpdateOcr = true
  233. updateOcrFile = append(updateOcrFile, []map[string]interface{}{ //重复数据打标签
  234. map[string]interface{}{
  235. "_id": tmp["_id"],
  236. },
  237. map[string]interface{}{
  238. "$set": map[string]interface{}{
  239. "is_repeat_status": 1,
  240. "is_repeat_time": qu.Int64All(time.Now().Unix()),
  241. },
  242. },
  243. })
  244. tmp = make(map[string]interface{})
  245. break
  246. } else {
  247. tmp = make(map[string]interface{})
  248. }
  249. }
  250. if !isUpdateOcr {
  251. log.Println("出现异常问题,查询不到ocr的lteid", cur_lteid)
  252. } else {
  253. if len(updateOcrFile) > 0 {
  254. task_mgo.UpSertBulk(task_coll, updateOcrFile...)
  255. }
  256. }
  257. }