historyRepeat.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. package main
  2. import (
  3. "encoding/json"
  4. "github.com/robfig/cron/v3"
  5. "gopkg.in/mgo.v2/bson"
  6. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. mu "jygit.jydev.jianyu360.cn/data_processing/common_utils/udp"
  8. "log"
  9. "net"
  10. "os"
  11. "strconv"
  12. "sync"
  13. "time"
  14. )
  15. // 历史判重
  16. func historyRepeat() {
  17. defer qu.Catch()
  18. for {
  19. start := time.Now().Unix()
  20. if gtid == "" {
  21. log.Println("请传gtid,否则无法运行")
  22. os.Exit(0)
  23. return
  24. }
  25. if lteid != "" && !IsFull { //先进行数据迁移
  26. log.Println("开启一次迁移任务", gtid, lteid)
  27. moveHistoryData(gtid, lteid)
  28. gtid = lteid //替换数据
  29. }
  30. //查询表最后一个id
  31. task_sess := task_mgo.GetMgoConn()
  32. defer task_mgo.DestoryMongoConn(task_sess)
  33. q := map[string]interface{}{}
  34. it_last := task_sess.DB(task_mgo.DbName).C(task_coll).Find(&q).Sort("-_id").Iter()
  35. isRepeatStatus := false
  36. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  37. is_repeat_status := qu.IntAll(tmp["repeat_status"])
  38. if is_repeat_status == 1 {
  39. lteid = qu.ObjToString(tmp["lteid"])
  40. log.Println("查询的最后一个已标记的任务lteid:", lteid)
  41. isRepeatStatus = true
  42. tmp = make(map[string]interface{})
  43. break
  44. } else {
  45. tmp = make(map[string]interface{})
  46. }
  47. }
  48. if !isRepeatStatus {
  49. log.Println("查询不到有标记的lteid数据......睡眠......")
  50. time.Sleep(30 * time.Second)
  51. continue
  52. }
  53. log.Println("查询找到有标记的lteid......睡眠......", gtid, lteid)
  54. if isUpdateSite {
  55. initSite()
  56. }
  57. time.Sleep(30 * time.Second)
  58. sess := data_mgo.GetMgoConn() //连接器
  59. defer data_mgo.DestoryMongoConn(sess)
  60. between_time := time.Now().Unix() - (86400 * timingPubScope) //两年周期
  61. //开始判重
  62. q = map[string]interface{}{
  63. "_id": map[string]interface{}{
  64. "$gt": StringTOBsonId(gtid),
  65. "$lte": StringTOBsonId(lteid),
  66. },
  67. }
  68. log.Println("历史判重查询条件:", q, "时间:", between_time)
  69. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  70. num, oknum, outnum, deterTime := int64(0), int64(0), int64(0), int64(0) //计数
  71. pendAllArr := [][]map[string]interface{}{} //待处理数组
  72. dayArr := []map[string]interface{}{}
  73. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  74. if num%10000 == 0 {
  75. log.Println("正序遍历:", num)
  76. }
  77. //取-符合-发布时间X年内的数据
  78. if qu.IntAll(tmp["dataging"]) == 1 {
  79. pubtime := qu.Int64All(tmp["publishtime"])
  80. if pubtime > 0 && pubtime >= between_time && qu.ObjToString(tmp["subtype"]) != "拟建" && qu.ObjToString(tmp["subtype"]) != "产权" &&
  81. qu.ObjToString(tmp["spidercode"]) != "sdxzbiddingsjzypc" {
  82. oknum++
  83. if deterTime == 0 {
  84. log.Println("找到第一条符合条件的数据")
  85. deterTime = qu.Int64All(tmp["publishtime"])
  86. dayArr = append(dayArr, tmp)
  87. } else {
  88. if pubtime-deterTime > timingSpanDay*86400 {
  89. //新数组重新构建,当前组数据加到全部组数据
  90. pendAllArr = append(pendAllArr, dayArr)
  91. dayArr = []map[string]interface{}{}
  92. deterTime = qu.Int64All(tmp["publishtime"])
  93. dayArr = append(dayArr, tmp)
  94. } else {
  95. dayArr = append(dayArr, tmp)
  96. }
  97. }
  98. } else {
  99. outnum++
  100. //不在两年内的也清标记
  101. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  102. map[string]interface{}{
  103. "_id": tmp["_id"],
  104. },
  105. map[string]interface{}{
  106. "$set": map[string]interface{}{
  107. "dataging": 0,
  108. "history_updatetime": qu.Int64All(time.Now().Unix()),
  109. },
  110. },
  111. }
  112. }
  113. }
  114. tmp = make(map[string]interface{})
  115. }
  116. if len(dayArr) > 0 {
  117. pendAllArr = append(pendAllArr, dayArr)
  118. dayArr = []map[string]interface{}{}
  119. }
  120. log.Println("查询数量:", num, "符合条件:", oknum, "未在两年内:", outnum)
  121. if len(pendAllArr) <= 0 {
  122. log.Println("没找到dataging==1的数据")
  123. }
  124. //测试分组数量是否正确
  125. testNum := 0
  126. for k, v := range pendAllArr {
  127. log.Println("第", k, "组--", "数量:", len(v))
  128. testNum = testNum + len(v)
  129. }
  130. log.Println("本地构建分组完成:", len(pendAllArr), "组", "测试-总计数量:", testNum)
  131. n, repeateN := 0, 0
  132. log.Println("线程数:", threadNum)
  133. pool := make(chan bool, threadNum)
  134. wg := &sync.WaitGroup{}
  135. for k, v := range pendAllArr { //每组结束更新一波数据
  136. pool <- true
  137. wg.Add(1)
  138. go func(k int, v []map[string]interface{}) {
  139. defer func() {
  140. <-pool
  141. wg.Done()
  142. }()
  143. log.Println("构建第", k, "组---(数据池)")
  144. //当前组的第一个发布时间
  145. first_pt := qu.Int64All(v[len(v)-1]["publishtime"])
  146. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  147. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  148. n = n + len(v)
  149. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  150. for _, tmp := range v {
  151. info := NewInfo(tmp)
  152. b, source, reason := curTM.check(info)
  153. if b { //有重复,更新
  154. repeateN++
  155. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  156. map[string]interface{}{
  157. "_id": tmp["_id"],
  158. },
  159. map[string]interface{}{
  160. "$set": map[string]interface{}{
  161. "repeat": 1,
  162. "repeat_reason": reason,
  163. "repeat_id": source.id,
  164. "dataging": 0,
  165. "history_updatetime": qu.Int64All(time.Now().Unix()),
  166. },
  167. },
  168. }
  169. } else {
  170. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  171. map[string]interface{}{
  172. "_id": tmp["_id"],
  173. },
  174. map[string]interface{}{
  175. "$set": map[string]interface{}{
  176. "dataging": 0, //符合条件的都为dataging==0
  177. "history_updatetime": qu.Int64All(time.Now().Unix()),
  178. },
  179. },
  180. }
  181. }
  182. }
  183. }(k, v)
  184. }
  185. wg.Wait()
  186. log.Println("this timeTask over.", n, "repeateN:", repeateN, gtid, lteid)
  187. time.Sleep(30 * time.Second)
  188. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  189. if gtid != lteid {
  190. for _, to := range nextNode {
  191. next_sid := qu.BsonIdToSId(gtid)
  192. next_eid := qu.BsonIdToSId(lteid)
  193. key := next_sid + "-" + next_eid + "-" + qu.ObjToString(to["stype"])
  194. by, _ := json.Marshal(map[string]interface{}{
  195. "gtid": next_sid,
  196. "lteid": next_eid,
  197. "stype": qu.ObjToString(to["stype"]),
  198. "key": key,
  199. })
  200. addr := &net.UDPAddr{
  201. IP: net.ParseIP(to["addr"].(string)),
  202. Port: qu.IntAll(to["port"]),
  203. }
  204. node := &udpNode{by, addr, time.Now().Unix(), 0}
  205. udptaskmap.Store(key, node)
  206. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  207. }
  208. }
  209. end := time.Now().Unix()
  210. log.Println(gtid, lteid)
  211. if end-start < 60*5 {
  212. log.Println("睡眠.............")
  213. time.Sleep(5 * time.Minute)
  214. }
  215. log.Println("继续下一段的历史判重")
  216. }
  217. }
  218. // 判断是否在当前id段落
  219. func judgeIsCurIds(gtid string, lteid string, curid string) bool {
  220. gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
  221. lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
  222. cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
  223. if cur_time >= gt_time && cur_time <= lte_time {
  224. return true
  225. }
  226. return false
  227. }
  228. // 迁移上一段数据
  229. func moveHistoryData(startid string, endid string) {
  230. sess := data_mgo.GetMgoConn()
  231. defer data_mgo.DestoryMongoConn(sess)
  232. year, month, day := time.Now().Date()
  233. q := map[string]interface{}{
  234. "_id": map[string]interface{}{
  235. "$gt": StringTOBsonId(startid),
  236. "$lte": StringTOBsonId(endid),
  237. },
  238. }
  239. log.Println(q)
  240. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Iter()
  241. index := 0
  242. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  243. data_mgo.Save(extract_back, tmp)
  244. tmp = map[string]interface{}{}
  245. if index%1000 == 0 {
  246. log.Println("index", index)
  247. }
  248. }
  249. log.Println("save to", extract_back, " ok index", index)
  250. qv := map[string]interface{}{
  251. "comeintime": map[string]interface{}{
  252. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour * 2).Unix(),
  253. },
  254. }
  255. delnum := data_mgo.Delete(extract, qv)
  256. log.Println("remove from ", extract, delnum)
  257. }
  258. // 暂时弃用
  259. func moveTimeoutData() {
  260. log.Println("部署迁移定时任务")
  261. c := cron.New()
  262. c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
  263. c.Start()
  264. }
  265. func moveOnceTimeOut() {
  266. log.Println("执行一次迁移超时数据")
  267. sess := data_mgo.GetMgoConn()
  268. defer data_mgo.DestoryMongoConn(sess)
  269. now := time.Now()
  270. move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  271. task_id := qu.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
  272. q := map[string]interface{}{
  273. "_id": map[string]interface{}{
  274. "$lt": StringTOBsonId(task_id),
  275. },
  276. }
  277. it := sess.DB(data_mgo.DbName).C("result_20200714").Find(&q).Iter()
  278. index := 0
  279. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  280. if index%10000 == 0 {
  281. log.Println("index", index)
  282. }
  283. del_id := BsonTOStringId(tmp["_id"])
  284. data_mgo.Save("result_20200713", tmp)
  285. data_mgo.DeleteById("result_20200714", del_id)
  286. tmp = map[string]interface{}{}
  287. }
  288. log.Println("save and delete", " ok index", index)
  289. }