fullDataRepeat.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package main
  2. import (
  3. "fmt"
  4. qu "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  5. "log"
  6. "sync"
  7. "time"
  8. )
  9. var timeLayout = "2006-01-02"
  10. //var timeLayout = "2006-01-02 15:04:05"
  11. // 划分时间段落
  12. func initModelArr() []map[string]interface{} {
  13. modelArr := make([]map[string]interface{}, 0)
  14. start := time.Date(2021, 12, 15, 0, 0, 0, 0, time.Local).Unix()
  15. end := time.Date(2022, 1, 1, 0, 0, 0, 0, time.Local).Unix()
  16. gte_time := start
  17. lt_time := start + 86400
  18. log.Println("开始构建数据池...一周...")
  19. FullDM = TimedTaskDatamap(dupdays, start, 1)
  20. log.Println("......")
  21. log.Println("开启...全量判重...", start, "~", end)
  22. for {
  23. modelArr = append(modelArr, map[string]interface{}{
  24. "publishtime": map[string]interface{}{
  25. "$gte": gte_time,
  26. "$lt": lt_time,
  27. },
  28. })
  29. gte_time = lt_time
  30. lt_time = gte_time + 86400
  31. if lt_time > end {
  32. break
  33. }
  34. }
  35. return modelArr
  36. }
  37. // 全量数据处理
  38. func fullDataRepeat() {
  39. modelArr := initModelArr()
  40. for _, query := range modelArr {
  41. pt := *qu.ObjToMap(query["publishtime"])
  42. time_str := time.Unix(qu.Int64All(pt["$gte"]), 0).Format(timeLayout)
  43. dealWithfullData(query, time_str)
  44. }
  45. }
  46. // 多线程~处理数据
  47. func dealWithfullData(query map[string]interface{}, time_str string) {
  48. log.Println("开始处理~", time_str, "~", query)
  49. sess := data_mgo.GetMgoConn()
  50. defer data_mgo.DestoryMongoConn(sess)
  51. it := sess.DB(data_mgo.DbName).C(extract).Find(&query).Sort("publishtime").Iter()
  52. total, isok, repeatN := 0, 0, 0
  53. dataAllDict := make(map[string][]map[string]interface{}, 0)
  54. for tmp := make(map[string]interface{}); it.Next(&tmp); total++ {
  55. if qu.IntAll(tmp["repeat"]) == 1 || qu.IntAll(tmp["repeat"]) == -1 ||
  56. qu.ObjToString(tmp["subtype"]) == "拟建" || qu.ObjToString(tmp["subtype"]) == "产权" ||
  57. qu.ObjToString(tmp["spidercode"]) == "sdxzbiddingsjzypc" {
  58. tmp = make(map[string]interface{})
  59. continue
  60. }
  61. isok++
  62. subtype := qu.ObjToString(tmp["subtype"])
  63. if subtype == "招标" || subtype == "邀标" || subtype == "询价" ||
  64. subtype == "竞谈" || subtype == "竞价" {
  65. subtype = "招标"
  66. }
  67. dataArr := dataAllDict[subtype]
  68. if dataArr == nil {
  69. dataArr = []map[string]interface{}{}
  70. }
  71. dataArr = append(dataArr, tmp)
  72. dataAllDict[subtype] = dataArr
  73. tmp = make(map[string]interface{})
  74. }
  75. pool := make(chan bool, threadNum)
  76. wg := &sync.WaitGroup{}
  77. for _, dataArr := range dataAllDict {
  78. fmt.Print("...")
  79. pool <- true
  80. wg.Add(1)
  81. go func(dataArr []map[string]interface{}) {
  82. defer func() {
  83. <-pool
  84. wg.Done()
  85. }()
  86. num := 0
  87. for _, tmp := range dataArr {
  88. info := NewInfo(tmp)
  89. b, source, reason := FullDM.check(info)
  90. if b {
  91. num++
  92. AddGroupPool.pool <- map[string]interface{}{
  93. "_id": StringTOBsonId(info.id),
  94. "repeat_id": source.id,
  95. "reason": reason,
  96. "update_time": qu.Int64All(time.Now().Unix()),
  97. }
  98. }
  99. }
  100. numlock.Lock()
  101. repeatN += num
  102. numlock.Unlock()
  103. }(dataArr)
  104. }
  105. wg.Wait()
  106. log.Println("处理结束~", time_str, "总计需判重~", isok, "~重复量", repeatN)
  107. }