flow_historyRepeat.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package main
  2. import (
  3. "encoding/json"
  4. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  6. "log"
  7. "net"
  8. "os"
  9. "sync"
  10. "time"
  11. )
  12. // 支持流式结构的历史判重...
  13. func historyFlowRepeat() {
  14. defer qu.Catch()
  15. //刚启动根据起始id...查询到至今的...往前推1小时...
  16. for {
  17. if gtid == "" {
  18. log.Println("请传gtid,否则无法运行")
  19. os.Exit(0)
  20. return
  21. }
  22. if lteid != "" && !IsFull { //先进行数据迁移
  23. log.Println("开启一次迁移任务", gtid, lteid)
  24. moveHistoryData(gtid, lteid)
  25. gtid = lteid //替换数据
  26. }
  27. //查询表最后一个id...
  28. isRepeatStatus := false
  29. lteid = FindOneLteid()
  30. if lteid > gtid {
  31. isRepeatStatus = true
  32. }
  33. if !isRepeatStatus {
  34. log.Println("查询不到最新lteid数据...睡眠...")
  35. time.Sleep(30 * time.Second)
  36. continue
  37. }
  38. log.Println("查询找到最新的lteid...", gtid, lteid)
  39. if isUpdateSite {
  40. initSite()
  41. }
  42. sess := data_mgo.GetMgoConn() //连接器
  43. defer data_mgo.DestoryMongoConn(sess)
  44. between_time := time.Now().Unix() - (86400 * timingPubScope) //周期
  45. //开始判重
  46. q := map[string]interface{}{
  47. "_id": map[string]interface{}{
  48. "$gt": StringTOBsonId(gtid),
  49. "$lte": StringTOBsonId(lteid),
  50. },
  51. }
  52. log.Println("历史判重查询条件:", q, "时间:", between_time)
  53. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  54. num, oknum, outnum, deterTime := int64(0), int64(0), int64(0), int64(0) //计数
  55. pendAllArr := [][]map[string]interface{}{} //待处理数组
  56. dayArr := []map[string]interface{}{}
  57. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  58. if num%10000 == 0 {
  59. log.Println("正序遍历:", num)
  60. }
  61. //取-符合-发布时间X年内的数据
  62. if qu.IntAll(tmp["dataging"]) == 1 {
  63. pubtime := qu.Int64All(tmp["publishtime"])
  64. if pubtime > 0 && pubtime >= between_time && qu.ObjToString(tmp["subtype"]) != "拟建" && qu.ObjToString(tmp["subtype"]) != "产权" &&
  65. qu.ObjToString(tmp["spidercode"]) != "sdxzbiddingsjzypc" {
  66. oknum++
  67. if deterTime == 0 {
  68. log.Println("找到第一条符合条件的数据")
  69. deterTime = qu.Int64All(tmp["publishtime"])
  70. dayArr = append(dayArr, tmp)
  71. } else {
  72. if pubtime-deterTime > timingSpanDay*86400 {
  73. //新数组重新构建,当前组数据加到全部组数据
  74. pendAllArr = append(pendAllArr, dayArr)
  75. dayArr = []map[string]interface{}{}
  76. deterTime = qu.Int64All(tmp["publishtime"])
  77. dayArr = append(dayArr, tmp)
  78. } else {
  79. dayArr = append(dayArr, tmp)
  80. }
  81. }
  82. } else {
  83. outnum++
  84. update := map[string]interface{}{
  85. "dataging": 0,
  86. "history_updatetime": qu.Int64All(time.Now().Unix()),
  87. }
  88. //不在两年内的也清标记
  89. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  90. map[string]interface{}{
  91. "_id": tmp["_id"],
  92. },
  93. map[string]interface{}{
  94. "$set": update,
  95. },
  96. }
  97. //发送消息告知...需要进行更新操作...
  98. sendFlowRepeatInfo(update, BsonTOStringId(tmp["_id"]))
  99. }
  100. }
  101. tmp = make(map[string]interface{})
  102. }
  103. if len(dayArr) > 0 {
  104. pendAllArr = append(pendAllArr, dayArr)
  105. dayArr = []map[string]interface{}{}
  106. }
  107. log.Println("查询数量:", num, "符合条件:", oknum, "未在两年内:", outnum)
  108. if len(pendAllArr) <= 0 {
  109. log.Println("没找到dataging==1的数据")
  110. }
  111. //测试分组数量是否正确
  112. testNum := 0
  113. for k, v := range pendAllArr {
  114. log.Println("第", k, "组--", "数量:", len(v))
  115. testNum = testNum + len(v)
  116. }
  117. log.Println("本地构建分组完成:", len(pendAllArr), "组", "测试-总计数量:", testNum)
  118. n, repeateN := 0, 0
  119. log.Println("线程数:", threadNum)
  120. pool := make(chan bool, threadNum)
  121. wg := &sync.WaitGroup{}
  122. for k, v := range pendAllArr { //每组结束更新一波数据
  123. pool <- true
  124. wg.Add(1)
  125. go func(k int, v []map[string]interface{}) {
  126. defer func() {
  127. <-pool
  128. wg.Done()
  129. }()
  130. log.Println("构建第", k, "组---(数据池)")
  131. //当前组的第一个发布时间
  132. first_pt := qu.Int64All(v[len(v)-1]["publishtime"])
  133. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  134. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  135. n = n + len(v)
  136. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  137. for _, tmp := range v {
  138. info := NewInfo(tmp)
  139. b, source, reason := curTM.check(info)
  140. if b { //有重复,更新
  141. repeateN++
  142. update := map[string]interface{}{
  143. "repeat": 1,
  144. "repeat_reason": reason,
  145. "repeat_id": source.id,
  146. "dataging": 0,
  147. "history_updatetime": qu.Int64All(time.Now().Unix()),
  148. }
  149. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  150. map[string]interface{}{
  151. "_id": tmp["_id"],
  152. },
  153. map[string]interface{}{
  154. "$set": update,
  155. },
  156. }
  157. //发送消息告知...需要进行更新操作...
  158. sendFlowRepeatInfo(update, BsonTOStringId(tmp["_id"]))
  159. } else {
  160. update := map[string]interface{}{
  161. "dataging": 0, //符合条件的都为dataging==0
  162. "history_updatetime": qu.Int64All(time.Now().Unix()),
  163. }
  164. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  165. map[string]interface{}{
  166. "_id": tmp["_id"],
  167. },
  168. map[string]interface{}{
  169. "$set": update,
  170. },
  171. }
  172. //发送消息告知...需要进行更新操作...
  173. sendFlowRepeatInfo(update, BsonTOStringId(tmp["_id"]))
  174. }
  175. }
  176. }(k, v)
  177. }
  178. wg.Wait()
  179. log.Println("this timeTask over.", n, "repeateN:", repeateN, gtid, lteid)
  180. time.Sleep(10 * time.Second)
  181. //发送upd支持第二阶段流程...
  182. if gtid != lteid {
  183. for _, to := range nextNode {
  184. next_sid := qu.BsonIdToSId(gtid)
  185. next_eid := qu.BsonIdToSId(lteid)
  186. key := next_sid + "-" + next_eid + "-" + qu.ObjToString(to["stype"])
  187. by, _ := json.Marshal(map[string]interface{}{
  188. "gtid": next_sid,
  189. "lteid": next_eid,
  190. "stype": qu.ObjToString(to["stype"]),
  191. "key": key,
  192. })
  193. addr := &net.UDPAddr{
  194. IP: net.ParseIP(to["addr"].(string)),
  195. Port: qu.IntAll(to["port"]),
  196. }
  197. node := &udpNode{by, addr, time.Now().Unix(), 0}
  198. udptaskmap.Store(key, node)
  199. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  200. }
  201. }
  202. log.Println("继续下一段的历史判重")
  203. }
  204. }
  205. // 查询最后一个id...倒推...100条?
  206. func FindOneLteid() string {
  207. task_sess := task_mgo.GetMgoConn()
  208. defer task_mgo.DestoryMongoConn(task_sess)
  209. q, total := map[string]interface{}{}, 0
  210. it_last := task_sess.DB(task_mgo.DbName).C(task_bidding).Find(&q).Sort("-_id").Iter()
  211. for tmp := make(map[string]interface{}); it_last.Next(&tmp); total++ {
  212. if total >= 100 {
  213. lteid = qu.ObjToString(tmp["lteid"])
  214. break
  215. }
  216. tmp = make(map[string]interface{})
  217. }
  218. return lteid
  219. }
  220. // 发送消息...流结构...
  221. func sendFlowRepeatInfo(update map[string]interface{}, tmpid string) {
  222. msgInfo := MsgInfo{}
  223. msgInfo.Id = tmpid
  224. msgInfo.Data = update
  225. bs, err := json.Marshal(msgInfo)
  226. if err == nil {
  227. jn.PubReqZip("repeat_task", bs, time.Second)
  228. } else {
  229. log.Println("异常发送流数据...", tmpid)
  230. //需要保存记录 tmpid ...
  231. }
  232. }