historyRepeat.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. package main
  2. import (
  3. "encoding/json"
  4. "github.com/cron"
  5. "gopkg.in/mgo.v2/bson"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "os"
  10. qu "qfw/common/src/qfw/util"
  11. "qfw/util"
  12. "strconv"
  13. "sync"
  14. "time"
  15. )
  16. //历史判重
  17. func historyRepeat() {
  18. defer util.Catch()
  19. for {
  20. start := time.Now().Unix()
  21. if gtid == "" {
  22. log.Println("请传gtid,否则无法运行")
  23. os.Exit(0)
  24. return
  25. }
  26. if lteid != "" && !IsFull { //先进行数据迁移
  27. log.Println("开启一次迁移任务", gtid, lteid)
  28. moveHistoryData(gtid, lteid)
  29. gtid = lteid //替换数据
  30. }
  31. //查询表最后一个id
  32. task_sess := task_mgo.GetMgoConn()
  33. defer task_mgo.DestoryMongoConn(task_sess)
  34. q := map[string]interface{}{}
  35. it_last := task_sess.DB(task_mgo.DbName).C(task_collName).Find(&q).Sort("-_id").Iter()
  36. isRepeatStatus := false
  37. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  38. is_repeat_status := util.IntAll(tmp["is_repeat_status"])
  39. if is_repeat_status == 1 {
  40. lteid = util.ObjToString(tmp["lteid"])
  41. log.Println("查询的最后一个已标记的任务lteid:", lteid)
  42. isRepeatStatus = true
  43. tmp = make(map[string]interface{})
  44. break
  45. } else {
  46. tmp = make(map[string]interface{})
  47. }
  48. }
  49. if !isRepeatStatus {
  50. log.Println("查询不到有标记的lteid数据")
  51. log.Println("睡眠5分钟 gtid:", gtid, "lteid:", lteid)
  52. time.Sleep(5 * time.Minute)
  53. continue
  54. }
  55. log.Println("查询完毕-找到有标记的lteid-先睡眠5分钟", gtid, lteid)
  56. if isUpdateSite {
  57. initSite()
  58. }
  59. time.Sleep(5 * time.Minute)
  60. sess := data_mgo.GetMgoConn() //连接器
  61. defer data_mgo.DestoryMongoConn(sess)
  62. between_time := time.Now().Unix() - (86400 * timingPubScope) //两年周期
  63. //开始判重
  64. q = map[string]interface{}{
  65. "_id": map[string]interface{}{
  66. "$gt": StringTOBsonId(gtid),
  67. "$lte": StringTOBsonId(lteid),
  68. },
  69. }
  70. log.Println("历史判重查询条件:", q, "时间:", between_time)
  71. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  72. num, oknum, outnum, deterTime := int64(0), int64(0), int64(0), int64(0) //计数
  73. pendAllArr := [][]map[string]interface{}{} //待处理数组
  74. dayArr := []map[string]interface{}{}
  75. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  76. if num%10000 == 0 {
  77. log.Println("正序遍历:", num)
  78. }
  79. //取-符合-发布时间X年内的数据
  80. if util.IntAll(tmp["dataging"]) == 1 {
  81. pubtime := util.Int64All(tmp["publishtime"])
  82. if pubtime > 0 && pubtime >= between_time {
  83. oknum++
  84. if deterTime == 0 {
  85. log.Println("找到第一条符合条件的数据")
  86. deterTime = util.Int64All(tmp["publishtime"])
  87. dayArr = append(dayArr, tmp)
  88. } else {
  89. if pubtime-deterTime > timingSpanDay*86400 {
  90. //新数组重新构建,当前组数据加到全部组数据
  91. pendAllArr = append(pendAllArr, dayArr)
  92. dayArr = []map[string]interface{}{}
  93. deterTime = util.Int64All(tmp["publishtime"])
  94. dayArr = append(dayArr, tmp)
  95. } else {
  96. dayArr = append(dayArr, tmp)
  97. }
  98. }
  99. } else {
  100. outnum++
  101. //不在两年内的也清标记
  102. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  103. map[string]interface{}{
  104. "_id": tmp["_id"],
  105. },
  106. map[string]interface{}{
  107. "$set": map[string]interface{}{
  108. "dataging": 0,
  109. "history_updatetime": util.Int64All(time.Now().Unix()),
  110. },
  111. },
  112. }
  113. }
  114. }
  115. tmp = make(map[string]interface{})
  116. }
  117. if len(dayArr) > 0 {
  118. pendAllArr = append(pendAllArr, dayArr)
  119. dayArr = []map[string]interface{}{}
  120. }
  121. log.Println("查询数量:", num, "符合条件:", oknum, "未在两年内:", outnum)
  122. if len(pendAllArr) <= 0 {
  123. log.Println("没找到dataging==1的数据")
  124. }
  125. //测试分组数量是否正确
  126. testNum := 0
  127. for k, v := range pendAllArr {
  128. log.Println("第", k, "组--", "数量:", len(v))
  129. testNum = testNum + len(v)
  130. }
  131. log.Println("本地构建分组完成:", len(pendAllArr), "组", "测试-总计数量:", testNum)
  132. n, repeateN := 0, 0
  133. log.Println("线程数:", threadNum)
  134. pool := make(chan bool, threadNum)
  135. wg := &sync.WaitGroup{}
  136. for k, v := range pendAllArr { //每组结束更新一波数据
  137. pool <- true
  138. wg.Add(1)
  139. go func(k int, v []map[string]interface{}) {
  140. defer func() {
  141. <-pool
  142. wg.Done()
  143. }()
  144. log.Println("构建第", k, "组---(数据池)")
  145. //当前组的第一个发布时间
  146. first_pt := util.Int64All(v[len(v)-1]["publishtime"])
  147. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  148. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  149. n = n + len(v)
  150. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  151. for _, tmp := range v {
  152. info := NewInfo(tmp)
  153. b, source, reason := curTM.check(info)
  154. if b { //有重复,更新
  155. repeateN++
  156. if judgeIsReplaceInfo(source.href, info.href) {
  157. datalock.Lock()
  158. temp_source_id := source.id
  159. temp_info_id := info.id
  160. temp_source := info
  161. temp_source.id = temp_source_id
  162. curTM.replacePoolData(temp_source)
  163. //替换抽取表数据
  164. is_log, is_exists, ext_s_data, ext_i_data := confrimHistoryExtractData(temp_source_id, temp_info_id)
  165. is_bid, bid_s_data, bid_i_data := confrimBiddingData(temp_source_id, temp_info_id)
  166. if is_log && is_bid {
  167. data_mgo.Save(extract_log, map[string]interface{}{
  168. "_id": StringTOBsonId(temp_info_id),
  169. "replace_id": temp_source_id,
  170. "is_history": 1,
  171. })
  172. ext_s_data["repeat"] = 0
  173. ext_s_data["dataging"] = 0
  174. ext_i_data["repeat"] = 1
  175. ext_i_data["repeat_id"] = temp_source_id
  176. ext_i_data["repeat_reason"] = reason
  177. ext_i_data["dataging"] = 0
  178. ext_i_data["history_updatetime"] = qu.Int64All(time.Now().Unix())
  179. if is_exists {
  180. data_mgo.DeleteById(extract, temp_source_id)
  181. data_mgo.Save(extract, ext_s_data)
  182. } else {
  183. data_mgo.DeleteById(extract_back, temp_source_id)
  184. data_mgo.Save(extract_back, ext_s_data)
  185. }
  186. data_mgo.DeleteById(extract, temp_info_id)
  187. data_mgo.Save(extract, ext_i_data)
  188. task_mgo.DeleteById(task_bidding, temp_source_id)
  189. task_mgo.Save(task_bidding, bid_s_data)
  190. task_mgo.DeleteById(task_bidding, temp_info_id)
  191. task_mgo.Save(task_bidding, bid_i_data)
  192. //通道填充数据
  193. msg := "id=" + temp_source_id
  194. _ = MP.Publish(msg)
  195. } else {
  196. log.Println("替换~相关表~未查询到数据~", temp_source_id, "~", temp_info_id)
  197. }
  198. datalock.Unlock()
  199. } else {
  200. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  201. map[string]interface{}{
  202. "_id": tmp["_id"],
  203. },
  204. map[string]interface{}{
  205. "$set": map[string]interface{}{
  206. "repeat": 1,
  207. "repeat_reason": reason,
  208. "repeat_id": source.id,
  209. "dataging": 0,
  210. "history_updatetime": util.Int64All(time.Now().Unix()),
  211. },
  212. },
  213. }
  214. }
  215. } else {
  216. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  217. map[string]interface{}{
  218. "_id": tmp["_id"],
  219. },
  220. map[string]interface{}{
  221. "$set": map[string]interface{}{
  222. "dataging": 0, //符合条件的都为dataging==0
  223. "history_updatetime": util.Int64All(time.Now().Unix()),
  224. },
  225. },
  226. }
  227. }
  228. }
  229. }(k, v)
  230. }
  231. wg.Wait()
  232. log.Println("this timeTask over.", n, "repeateN:", repeateN, gtid, lteid)
  233. time.Sleep(30 * time.Second)
  234. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  235. if gtid != lteid {
  236. for _, to := range nextNode {
  237. next_sid := util.BsonIdToSId(gtid)
  238. next_eid := util.BsonIdToSId(lteid)
  239. key := next_sid + "-" + next_eid + "-" + util.ObjToString(to["stype"])
  240. by, _ := json.Marshal(map[string]interface{}{
  241. "gtid": next_sid,
  242. "lteid": next_eid,
  243. "stype": util.ObjToString(to["stype"]),
  244. "key": key,
  245. })
  246. addr := &net.UDPAddr{
  247. IP: net.ParseIP(to["addr"].(string)),
  248. Port: util.IntAll(to["port"]),
  249. }
  250. node := &udpNode{by, addr, time.Now().Unix(), 0}
  251. udptaskmap.Store(key, node)
  252. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  253. }
  254. }
  255. end := time.Now().Unix()
  256. log.Println(gtid, lteid)
  257. if end-start < 60*5 {
  258. log.Println("睡眠.............")
  259. time.Sleep(5 * time.Minute)
  260. }
  261. log.Println("继续下一段的历史判重")
  262. }
  263. }
  264. //判断是否在当前id段落
  265. func judgeIsCurIds(gtid string, lteid string, curid string) bool {
  266. gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
  267. lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
  268. cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
  269. if cur_time >= gt_time && cur_time <= lte_time {
  270. return true
  271. }
  272. return false
  273. }
  274. //迁移上一段数据
  275. func moveHistoryData(startid string, endid string) {
  276. sess := data_mgo.GetMgoConn()
  277. defer data_mgo.DestoryMongoConn(sess)
  278. year, month, day := time.Now().Date()
  279. q := map[string]interface{}{
  280. "_id": map[string]interface{}{
  281. "$gt": StringTOBsonId(startid),
  282. "$lte": StringTOBsonId(endid),
  283. },
  284. }
  285. log.Println(q)
  286. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Iter()
  287. index := 0
  288. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  289. data_mgo.Save(extract_back, tmp)
  290. tmp = map[string]interface{}{}
  291. if index%1000 == 0 {
  292. log.Println("index", index)
  293. }
  294. }
  295. log.Println("save to", extract_back, " ok index", index)
  296. qv := map[string]interface{}{
  297. "comeintime": map[string]interface{}{
  298. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour * 2).Unix(),
  299. },
  300. }
  301. delnum := data_mgo.Delete(extract, qv)
  302. log.Println("remove from ", extract, delnum)
  303. }
  304. //暂时弃用
  305. func moveTimeoutData() {
  306. log.Println("部署迁移定时任务")
  307. c := cron.New()
  308. c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
  309. c.Start()
  310. }
  311. func moveOnceTimeOut() {
  312. log.Println("执行一次迁移超时数据")
  313. sess := data_mgo.GetMgoConn()
  314. defer data_mgo.DestoryMongoConn(sess)
  315. now := time.Now()
  316. move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  317. task_id := util.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
  318. q := map[string]interface{}{
  319. "_id": map[string]interface{}{
  320. "$lt": StringTOBsonId(task_id),
  321. },
  322. }
  323. it := sess.DB(data_mgo.DbName).C("result_20200714").Find(&q).Iter()
  324. index := 0
  325. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  326. if index%10000 == 0 {
  327. log.Println("index", index)
  328. }
  329. del_id := BsonTOStringId(tmp["_id"])
  330. data_mgo.Save("result_20200713", tmp)
  331. data_mgo.DeleteById("result_20200714", del_id)
  332. tmp = map[string]interface{}{}
  333. }
  334. log.Println("save and delete", " ok index", index)
  335. }