historyRepeat.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331
  1. package main
  2. import (
  3. "encoding/json"
  4. "github.com/cron"
  5. "gopkg.in/mgo.v2/bson"
  6. "log"
  7. mu "mfw/util"
  8. "net"
  9. "os"
  10. qutil "qfw/util"
  11. "strconv"
  12. "sync"
  13. "time"
  14. )
  15. func getHistoryInfoId(keystatus string) bool {
  16. //查询表最后一个id
  17. qfw_sess := qfw_mgo.GetMgoConn()
  18. defer qfw_mgo.DestoryMongoConn(qfw_sess)
  19. q := map[string]interface{}{}
  20. it_last := qfw_sess.DB(qfw_mgo.DbName).C(qfw_coll).Find(&q).Sort("-_id").Iter()
  21. isRepeatStatus := false
  22. for tmp := make(map[string]interface{}); it_last.Next(&tmp); {
  23. is_repeat_status := qutil.IntAll(tmp[keystatus])
  24. if is_repeat_status == 1 {
  25. lteid = qutil.ObjToString(tmp["lteid"])
  26. log.Println("查询的最后一个已标记的任务lteid:", lteid)
  27. isRepeatStatus = true
  28. tmp = make(map[string]interface{})
  29. break
  30. } else {
  31. tmp = make(map[string]interface{})
  32. }
  33. }
  34. return isRepeatStatus
  35. }
  36. // 历史判重
  37. func historyRepeat() {
  38. defer qutil.Catch()
  39. for {
  40. start := time.Now().Unix()
  41. if gtid == "" {
  42. log.Println("请传gtid,否则无法运行")
  43. os.Exit(0)
  44. return
  45. }
  46. if lteid != "" && !IsFull { //先进行数据迁移
  47. log.Println("开启一次迁移任务", gtid, lteid)
  48. moveHistoryData(gtid, lteid)
  49. gtid = lteid //替换数据
  50. }
  51. //查询历史数据id
  52. isRepeatStatus := false
  53. if !update_ai {
  54. isRepeatStatus = getHistoryInfoId("repeat_status")
  55. } else {
  56. isRepeatStatus = getHistoryInfoId("repeat_status_ai")
  57. }
  58. if !isRepeatStatus {
  59. log.Println("查询不到有标记的lteid数据......睡眠......")
  60. time.Sleep(30 * time.Second)
  61. continue
  62. }
  63. log.Println("查询找到有标记的lteid......睡眠......", gtid, lteid)
  64. if isUpdateSite {
  65. initSite()
  66. }
  67. time.Sleep(30 * time.Second)
  68. sess := data_mgo.GetMgoConn() //连接器
  69. defer data_mgo.DestoryMongoConn(sess)
  70. between_time := time.Now().Unix() - (86400 * timingPubScope) //两年周期
  71. ////临时-补齐差额
  72. //log.Println("临时···66c58769b25c3e1deb79107c,66f617bdb25c3e1deb3ee999")
  73. //gtid = "66c58769b25c3e1deb79107c"
  74. //lteid = "66f617bdb25c3e1deb3ee999"
  75. //开始判重
  76. q := map[string]interface{}{
  77. "_id": map[string]interface{}{
  78. "$gt": StringTOBsonId(gtid),
  79. "$lte": StringTOBsonId(lteid),
  80. },
  81. }
  82. log.Println("历史判重查询条件:", q, "时间:", between_time)
  83. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  84. num, oknum, outnum, deterTime := int64(0), int64(0), int64(0), int64(0) //计数
  85. pendAllArr := [][]map[string]interface{}{} //待处理数组
  86. dayArr := []map[string]interface{}{}
  87. for tmp := make(map[string]interface{}); it.Next(&tmp); num++ {
  88. if num%10000 == 0 {
  89. log.Println("正序遍历:", num, "~", oknum)
  90. }
  91. delete(tmp, "field_source")
  92. delete(tmp, "regions_log")
  93. delete(tmp, "kvtext")
  94. //取-符合-发布时间X年内的数据
  95. if qutil.IntAll(tmp["dataging"]) == 1 {
  96. pubtime := qutil.Int64All(tmp["publishtime"])
  97. if pubtime > 0 && pubtime >= between_time && qutil.ObjToString(tmp["subtype"]) != "拟建" && qutil.ObjToString(tmp["subtype"]) != "产权" &&
  98. qutil.ObjToString(tmp["spidercode"]) != "sdxzbiddingsjzypc" {
  99. oknum++
  100. if deterTime == 0 {
  101. log.Println("找到第一条符合条件的数据")
  102. deterTime = qutil.Int64All(tmp["publishtime"])
  103. dayArr = append(dayArr, tmp)
  104. } else {
  105. if pubtime-deterTime > timingSpanDay*86400 {
  106. //新数组重新构建,当前组数据加到全部组数据
  107. pendAllArr = append(pendAllArr, dayArr)
  108. dayArr = []map[string]interface{}{}
  109. deterTime = qutil.Int64All(tmp["publishtime"])
  110. dayArr = append(dayArr, tmp)
  111. } else {
  112. dayArr = append(dayArr, tmp)
  113. }
  114. }
  115. } else {
  116. outnum++
  117. //不在两年内的也清标记
  118. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  119. map[string]interface{}{
  120. "_id": tmp["_id"],
  121. },
  122. map[string]interface{}{
  123. "$set": map[string]interface{}{
  124. "dataging": 0,
  125. "history_updatetime": qutil.Int64All(time.Now().Unix()),
  126. },
  127. },
  128. }
  129. }
  130. }
  131. tmp = make(map[string]interface{})
  132. }
  133. if len(dayArr) > 0 {
  134. pendAllArr = append(pendAllArr, dayArr)
  135. dayArr = []map[string]interface{}{}
  136. }
  137. log.Println("查询数量:", num, "符合条件:", oknum, "未在两年内:", outnum)
  138. if len(pendAllArr) <= 0 {
  139. log.Println("没找到dataging==1的数据")
  140. }
  141. //测试分组数量是否正确
  142. testNum := 0
  143. for k, v := range pendAllArr {
  144. log.Println("第", k, "组--", "数量:", len(v))
  145. testNum = testNum + len(v)
  146. }
  147. log.Println("本地构建分组完成:", len(pendAllArr), "组", "测试-总计数量:", testNum)
  148. n, repeateN := 0, 0
  149. log.Println("线程数:", threadNum)
  150. pool := make(chan bool, threadNum)
  151. wg := &sync.WaitGroup{}
  152. for k, v := range pendAllArr { //每组结束更新一波数据
  153. pool <- true
  154. wg.Add(1)
  155. go func(k int, v []map[string]interface{}) {
  156. defer func() {
  157. <-pool
  158. wg.Done()
  159. }()
  160. log.Println("构建第", k, "组---(数据池)")
  161. //当前组的第一个发布时间
  162. first_pt := qutil.Int64All(v[len(v)-1]["publishtime"])
  163. curTM := TimedTaskDatamap(dupdays+int(timingSpanDay)+1, first_pt+86400, int(k))
  164. log.Println("开始遍历判重第", k, "组 共计数量:", len(v))
  165. n = n + len(v)
  166. log.Println("统计目前总数量:", n, "重复数量:", repeateN)
  167. for _, tmp := range v {
  168. info := NewInfo(tmp)
  169. b, source, reason := curTM.check(info)
  170. if b { //有重复,更新
  171. repeateN++
  172. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  173. map[string]interface{}{
  174. "_id": tmp["_id"],
  175. },
  176. map[string]interface{}{
  177. "$set": map[string]interface{}{
  178. "repeat": 1,
  179. "repeat_reason": reason,
  180. "repeat_id": source.id,
  181. "dataging": 0,
  182. "history_updatetime": qutil.Int64All(time.Now().Unix()),
  183. },
  184. },
  185. }
  186. } else {
  187. Update.updatePool <- []map[string]interface{}{ //重复数据打标签
  188. map[string]interface{}{
  189. "_id": tmp["_id"],
  190. },
  191. map[string]interface{}{
  192. "$set": map[string]interface{}{
  193. "dataging": 0, //符合条件的都为dataging==0
  194. "history_updatetime": qutil.Int64All(time.Now().Unix()),
  195. },
  196. },
  197. }
  198. }
  199. }
  200. }(k, v)
  201. }
  202. wg.Wait()
  203. log.Println("this timeTask over.", n, "repeateN:", repeateN, gtid, lteid)
  204. time.Sleep(30 * time.Second)
  205. //任务完成,开始发送广播通知下面节点 发udp 去升索引待定 + 合并
  206. if gtid != lteid {
  207. for _, to := range nextNode {
  208. next_sid := qutil.BsonIdToSId(gtid)
  209. next_eid := qutil.BsonIdToSId(lteid)
  210. key := next_sid + "-" + next_eid + "-" + qutil.ObjToString(to["stype"])
  211. by, _ := json.Marshal(map[string]interface{}{
  212. "gtid": next_sid,
  213. "lteid": next_eid,
  214. "stype": qutil.ObjToString(to["stype"]),
  215. "key": key,
  216. })
  217. addr := &net.UDPAddr{
  218. IP: net.ParseIP(to["addr"].(string)),
  219. Port: qutil.IntAll(to["port"]),
  220. }
  221. node := &udpNode{by, addr, time.Now().Unix(), 0}
  222. udptaskmap.Store(key, node)
  223. udpclient.WriteUdp(by, mu.OP_TYPE_DATA, addr)
  224. }
  225. }
  226. end := time.Now().Unix()
  227. log.Println(gtid, lteid)
  228. if end-start < 60*5 {
  229. log.Println("睡眠.............")
  230. time.Sleep(5 * time.Minute)
  231. }
  232. log.Println("继续下一段的历史判重")
  233. }
  234. }
  235. // 判断是否在当前id段落
  236. func judgeIsCurIds(gtid string, lteid string, curid string) bool {
  237. gt_time, _ := strconv.ParseInt(gtid[:8], 16, 64)
  238. lte_time, _ := strconv.ParseInt(lteid[:8], 16, 64)
  239. cur_time, _ := strconv.ParseInt(curid[:8], 16, 64)
  240. if cur_time >= gt_time && cur_time <= lte_time {
  241. return true
  242. }
  243. return false
  244. }
  245. // 迁移上一段数据
  246. func moveHistoryData(startid string, endid string) {
  247. sess := data_mgo.GetMgoConn()
  248. defer data_mgo.DestoryMongoConn(sess)
  249. year, month, day := time.Now().Date()
  250. q := map[string]interface{}{
  251. "_id": map[string]interface{}{
  252. "$gt": StringTOBsonId(startid),
  253. "$lte": StringTOBsonId(endid),
  254. },
  255. }
  256. log.Println(q)
  257. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Iter()
  258. index := 0
  259. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  260. data_mgo.Save(extract_back, tmp)
  261. tmp = map[string]interface{}{}
  262. if index%1000 == 0 {
  263. log.Println("index", index)
  264. }
  265. }
  266. log.Println("save to", extract_back, " ok index", index)
  267. qv := map[string]interface{}{
  268. "comeintime": map[string]interface{}{
  269. "$lt": time.Date(year, month, day, 0, 0, 0, 0, time.Local).Add(-time.Duration(dupdays+1) * 24 * time.Hour * 2).Unix(),
  270. },
  271. }
  272. delnum := data_mgo.Delete(extract, qv)
  273. log.Println("remove from ", extract, delnum)
  274. }
  275. // 暂时弃用
  276. func moveTimeoutData() {
  277. log.Println("部署迁移定时任务")
  278. c := cron.New()
  279. c.AddFunc("0 0 0 * * ?", func() { moveOnceTimeOut() })
  280. c.Start()
  281. }
  282. func moveOnceTimeOut() {
  283. log.Println("执行一次迁移超时数据")
  284. sess := data_mgo.GetMgoConn()
  285. defer data_mgo.DestoryMongoConn(sess)
  286. now := time.Now()
  287. move_time := time.Date(now.Year()-2, now.Month(), now.Day(), 0, 0, 0, 0, time.Local)
  288. task_id := qutil.BsonIdToSId(bson.NewObjectIdWithTime(move_time))
  289. q := map[string]interface{}{
  290. "_id": map[string]interface{}{
  291. "$lt": StringTOBsonId(task_id),
  292. },
  293. }
  294. it := sess.DB(data_mgo.DbName).C("result_20200714").Find(&q).Iter()
  295. index := 0
  296. for tmp := make(map[string]interface{}); it.Next(&tmp); index++ {
  297. if index%10000 == 0 {
  298. log.Println("index", index)
  299. }
  300. del_id := BsonTOStringId(tmp["_id"])
  301. data_mgo.Save("result_20200713", tmp)
  302. data_mgo.DeleteById("result_20200714", del_id)
  303. tmp = map[string]interface{}{}
  304. }
  305. log.Println("save and delete", " ok index", index)
  306. }