increaseRepeat.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package main
  2. import (
  3. "encoding/json"
  4. "log"
  5. mu "mfw/util"
  6. "net"
  7. "qfw/common/src/qfw/util"
  8. qu "qfw/util"
  9. "sync"
  10. "time"
  11. )
  12. //开始增量判重程序
  13. func increaseRepeat(mapInfo map[string]interface{}) {
  14. defer qu.Catch()
  15. //区间id
  16. q := map[string]interface{}{
  17. "_id": map[string]interface{}{
  18. "$gt": StringTOBsonId(mapInfo["gtid"].(string)),
  19. "$lte": StringTOBsonId(mapInfo["lteid"].(string)),
  20. },
  21. }
  22. log.Println("~~~~~~")
  23. log.Println("开始增量数据判重~查询条件:",data_mgo.DbName, extract, q)
  24. sess := data_mgo.GetMgoConn()
  25. defer data_mgo.DestoryMongoConn(sess)
  26. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  27. total, isok ,repeatN:= 0,0,0
  28. dataAllDict := make(map[string][]map[string]interface{},0)
  29. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  30. if total%1000 == 0 {
  31. log.Println("current index : ", total, isok)
  32. }
  33. if util.IntAll(tmp["repeat"]) == 1 {
  34. repeatN++
  35. tmp = make(map[string]interface{})
  36. continue
  37. }
  38. if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
  39. tmp = make(map[string]interface{})
  40. continue
  41. }
  42. //数据分组-按照类别分组
  43. isok++
  44. subtype := qu.ObjToString(tmp["subtype"])
  45. if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
  46. subtype=="竞谈"||subtype=="竞价" {
  47. subtype = "招标"
  48. }
  49. dataArr := dataAllDict[subtype]
  50. if dataArr==nil {
  51. dataArr = []map[string]interface{}{}
  52. }
  53. dataArr = append(dataArr,tmp)
  54. dataAllDict[subtype] = dataArr
  55. tmp = make(map[string]interface{})
  56. }
  57. log.Println("类别组:",len(dataAllDict),"组","~","总计:",total,"~","需判重:",isok)
  58. pool := make(chan bool, threadNum)
  59. wg := &sync.WaitGroup{}
  60. for _,dataArr := range dataAllDict {
  61. log.Println("处理中...","当前重复量~", repeatN)
  62. pool <- true
  63. wg.Add(1)
  64. go func(dataArr []map[string]interface{}) {
  65. defer func() {
  66. <-pool
  67. wg.Done()
  68. }()
  69. num := 0
  70. for _,tmp := range dataArr{
  71. info := NewInfo(tmp)
  72. b,source,reason := DM.check(info)
  73. if b {
  74. //判断信息是否为-指定剑鱼发布数据
  75. if jyfb_data[info.spidercode]!="" { //伪判重标记
  76. Update.updatePool <- []map[string]interface{}{//原始数据打标签
  77. map[string]interface{}{
  78. "_id": StringTOBsonId(info.id),
  79. },
  80. map[string]interface{}{
  81. "$set": map[string]interface{}{
  82. "repeat_jyfb": 1,
  83. },
  84. },
  85. }
  86. } else { //真实重复~~~
  87. num++
  88. var updateID = map[string]interface{}{} //记录更新判重的
  89. updateID["_id"] = StringTOBsonId(info.id)
  90. repeat_ids:=source.repeat_ids
  91. repeat_ids = append(repeat_ids,info.id)
  92. source.repeat_ids = repeat_ids
  93. DM.replacePoolData(source)//替换数据池-更新
  94. Update.updatePool <- []map[string]interface{}{//原始数据打标签
  95. map[string]interface{}{
  96. "_id": StringTOBsonId(source.id),
  97. },
  98. map[string]interface{}{
  99. "$set": map[string]interface{}{
  100. "repeat_ids": repeat_ids,
  101. },
  102. },
  103. }
  104. Update.updatePool <- []map[string]interface{}{//重复数据打标签
  105. updateID,
  106. map[string]interface{}{
  107. "$set": map[string]interface{}{
  108. "repeat": 1,
  109. "repeat_reason": reason,
  110. "repeat_id": source.id,
  111. "dataging": 0,
  112. "updatetime_repeat" :util.Int64All(time.Now().Unix()),
  113. },
  114. },
  115. }
  116. }
  117. }
  118. }
  119. numberlock.Lock()
  120. repeatN+=num
  121. numberlock.Unlock()
  122. }(dataArr)
  123. }
  124. wg.Wait()
  125. log.Println("this cur task over.", total, "repeateN:", repeatN)
  126. //更新Ocr的标记
  127. updateOcrFileData(mapInfo["lteid"].(string))
  128. time.Sleep(15 * time.Second)
  129. //任务完成,开始发送广播通知下面节点
  130. log.Println("判重任务完成发送udp")
  131. for _, to := range nextNode {
  132. sid, _ := mapInfo["gtid"].(string)
  133. eid, _ := mapInfo["lteid"].(string)
  134. key := sid + "-" + eid + "-" + util.ObjToString(to["stype"])
  135. by, _ := json.Marshal(map[string]interface{}{
  136. "gtid": sid,
  137. "lteid": eid,
  138. "stype": util.ObjToString(to["stype"]),
  139. "key": key,
  140. })
  141. addr := &net.UDPAddr{
  142. IP: net.ParseIP(to["addr"].(string)),
  143. Port: util.IntAll(to["port"]),
  144. }
  145. node := &udpNode{by, addr, time.Now().Unix(), 0}
  146. udptaskmap.Store(key, node)
  147. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  148. }
  149. }
  150. //更新ocr表
  151. func updateOcrFileData(cur_lteid string) {
  152. //更新ocr 分类表-判重的状态
  153. log.Println("开始更新Ocr表-标记",cur_lteid)
  154. task_sess := task_mgo.GetMgoConn()
  155. defer task_mgo.DestoryMongoConn(task_sess)
  156. q_task:=map[string]interface{}{}
  157. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q_task).Sort("-_id").Iter()
  158. isUpdateOcr:=false
  159. updateOcrFile:=[][]map[string]interface{}{}
  160. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  161. cur_id := BsonTOStringId(tmp["_id"])
  162. lteid:=util.ObjToString(tmp["lteid"])
  163. if (lteid==cur_lteid) { //需要更新
  164. log.Println("找到该lteid数据",cur_lteid,cur_id)
  165. isUpdateOcr = true
  166. updateOcrFile = append(updateOcrFile, []map[string]interface{}{//重复数据打标签
  167. map[string]interface{}{
  168. "_id": tmp["_id"],
  169. },
  170. map[string]interface{}{
  171. "$set": map[string]interface{}{
  172. "is_repeat_status": 1,
  173. "is_repeat_time" : util.Int64All(time.Now().Unix()),
  174. },
  175. },
  176. })
  177. tmp = make(map[string]interface{})
  178. break
  179. }else {
  180. tmp = make(map[string]interface{})
  181. }
  182. }
  183. if !isUpdateOcr {
  184. log.Println("出现异常问题,查询不到ocr的lteid",cur_lteid)
  185. }else {
  186. if len(updateOcrFile) > 0 {
  187. task_mgo.UpSertBulk(task_collName, updateOcrFile...)
  188. }
  189. }
  190. }