increaseRepeat.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/common/src/qfw/util"
  8. qu "qfw/util"
  9. "sync"
  10. "time"
  11. )
  12. //开始增量判重程序
  13. func increaseRepeat(mapInfo map[string]interface{}) {
  14. defer qu.Catch()
  15. //区间id
  16. q := map[string]interface{}{
  17. "_id": map[string]interface{}{
  18. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  19. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  20. },
  21. }
  22. log.Println("~~~~~~")
  23. log.Println("开始增量数据判重~查询条件:",data_mgo.DbName, extract, q)
  24. sess := data_mgo.GetMgoConn()
  25. defer data_mgo.DestoryMongoConn(sess)
  26. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  27. total, isok ,repeatN:= 0,0,0
  28. dataAllDict := make(map[string][]map[string]interface{},0)
  29. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  30. if total%1000 == 0 {
  31. log.Println("index: ", total, isok)
  32. }
  33. if util.IntAll(tmp["repeat"]) == 1 {
  34. repeatN++
  35. tmp = make(map[string]interface{})
  36. continue
  37. }
  38. if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
  39. tmp = make(map[string]interface{})
  40. continue
  41. }
  42. //数据分组-按照类别分组
  43. isok++
  44. subtype := qu.ObjToString(tmp["subtype"])
  45. if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
  46. subtype=="竞谈"||subtype=="竞价" {
  47. subtype = "招标"
  48. }
  49. dataArr := dataAllDict[subtype]
  50. if dataArr==nil {
  51. dataArr = []map[string]interface{}{}
  52. }
  53. dataArr = append(dataArr,tmp)
  54. dataAllDict[subtype] = dataArr
  55. tmp = make(map[string]interface{})
  56. }
  57. log.Println("类别组:",len(dataAllDict),"组","~","总计:",total,"~","需判重:",isok)
  58. pool := make(chan bool, threadNum)
  59. wg := &sync.WaitGroup{}
  60. for _,dataArr := range dataAllDict {
  61. pool <- true
  62. wg.Add(1)
  63. go func(dataArr []map[string]interface{}) {
  64. defer func() {
  65. <-pool
  66. wg.Done()
  67. }()
  68. num := 0
  69. for _,tmp := range dataArr{
  70. info := NewInfo(tmp)
  71. b,source,reason := DM.check(info)
  72. if b {
  73. num++
  74. var updateID = map[string]interface{}{} //记录更新判重的
  75. updateID["_id"] = StringTOBsonId(info.id)
  76. repeat_ids:=source.repeat_ids
  77. repeat_ids = append(repeat_ids,info.id)
  78. source.repeat_ids = repeat_ids
  79. DM.replacePoolData(source)//替换数据池-更新
  80. Update.updatePool <- []map[string]interface{}{//原始数据打标签
  81. map[string]interface{}{
  82. "_id": StringTOBsonId(source.id),
  83. },
  84. map[string]interface{}{
  85. "$set": map[string]interface{}{
  86. "repeat_ids": repeat_ids,
  87. },
  88. },
  89. }
  90. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  91. updateID,
  92. map[string]interface{}{
  93. "$set": map[string]interface{}{
  94. "repeat": 1,
  95. "repeat_reason": reason,
  96. "repeat_id": source.id,
  97. "dataging": 0,
  98. "updatetime_repeat" :util.Int64All(time.Now().Unix()),
  99. },
  100. },
  101. }
  102. }
  103. }
  104. numberlock.Lock()
  105. repeatN+=num
  106. numberlock.Unlock()
  107. }(dataArr)
  108. }
  109. wg.Wait()
  110. log.Println("this cur task over.", total, "repeateN:", repeatN)
  111. //更新Ocr的标记
  112. updateOcrFileData(mapInfo["lteid"].(string))
  113. time.Sleep(15 * time.Second)
  114. //任务完成,开始发送广播通知下面节点
  115. log.Println("判重任务完成发送udp")
  116. for _, to := range nextNode {
  117. sid, _ := mapInfo["gtid"].(string)
  118. eid, _ := mapInfo["lteid"].(string)
  119. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  120. by, _ := json.Marshal(map[string]interface{}{
  121. "gtid": sid,
  122. "lteid": eid,
  123. "stype": util.ObjToString(to["stype"]),
  124. "key": key,
  125. })
  126. addr := &net.UDPAddr{
  127. IP: net.ParseIP(to["addr"].(string)),
  128. Port: util.IntAll(to["port"]),
  129. }
  130. node := &udpNode{by, addr, time.Now().Unix(), 0}
  131. udptaskmap.Store(key, node)
  132. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  133. }
  134. }
  135. //更新ocr表
  136. func updateOcrFileData(cur_lteid string) {
  137. //更新ocr 分类表-判重的状态
  138. log.Println("开始更新Ocr表-标记",cur_lteid)
  139. task_sess := task_mgo.GetMgoConn()
  140. defer task_mgo.DestoryMongoConn(task_sess)
  141. q_task:=map[string]interface{}{}
  142. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
  143. isUpdateOcr:=false
  144. updateOcrFile:=[][]map[string]interface{}{}
  145. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  146. cur_id := BsonTOStringId(tmp["_id"])
  147. lteid:=util.ObjToString(tmp["lteid"])
  148. if (lteid==cur_lteid) { //需要更新
  149. log.Println("找到该lteid数据",cur_lteid,cur_id)
  150. isUpdateOcr = true
  151. updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
  152. map[string]interface{}{
  153. "_id": tmp["_id"],
  154. },
  155. map[string]interface{}{
  156. "$set": map[string]interface{}{
  157. "is_repeat_status": 1,
  158. "is_repeat_time" : util.Int64All(time.Now().Unix()),
  159. },
  160. },
  161. })
  162. tmp = make(map[string]interface{})
  163. break
  164. }else {
  165. tmp = make(map[string]interface{})
  166. }
  167. }
  168. if !isUpdateOcr {
  169. log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
  170. }else {
  171. if len(updateOcrFile) > 0 {
  172. task_mgo.UpSertBulk(task_collName, updateOcrFile...)
  173. }
  174. }
  175. }