fullRepeat.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package main
  2. import (
  3. "log"
  4. "qfw/common/src/qfw/util"
  5. qu "qfw/util"
  6. "sync"
  7. "time"
  8. )
  9. //开始全量判重程序
  10. func fullRepeat(sid,eid string) {
  11. defer qu.Catch()
  12. //区间id-是否分段
  13. if IsFull && sec_gtid!="" && sec_lteid!=""{
  14. sid = sec_gtid
  15. eid = sec_lteid
  16. }
  17. q := map[string]interface{}{
  18. "_id": map[string]interface{}{
  19. "$gt": StringTOBsonId(sid),
  20. "$lte": StringTOBsonId(eid),
  21. },
  22. }
  23. log.Println("开始全量数据判重~查询条件:",data_mgo.DbName, extract, q)
  24. sess := data_mgo.GetMgoConn()
  25. defer data_mgo.DestoryMongoConn(sess)
  26. it := sess.DB(data_mgo.DbName).C(extract).Find(&q).Sort("publishtime").Iter()
  27. n, isok ,repeatN:= 0,0,0
  28. dataAllDict := make(map[string][]map[string]interface{},0)
  29. for tmp := make(map[string]interface{}); it.Next(&tmp); n++ {
  30. if n%1000 == 0 {
  31. log.Println("index: ", n, isok)
  32. }
  33. if util.IntAll(tmp["repeat"]) == 1 {
  34. repeatN++
  35. tmp = make(map[string]interface{})
  36. continue
  37. }
  38. if util.IntAll(tmp["dataging"]) == 1 && !IsFull{
  39. tmp = make(map[string]interface{})
  40. continue
  41. }
  42. //优化空间-相同天-划分一组(在分类别)
  43. isok++
  44. //数据分组-按照类别分组
  45. subtype := qu.ObjToString(tmp["subtype"])
  46. if subtype=="招标"||subtype=="邀标"||subtype=="询价"||
  47. subtype=="竞谈"||subtype=="竞价" {
  48. subtype = "招标"
  49. }
  50. dataArr := dataAllDict[subtype]
  51. if dataArr==nil {
  52. dataArr = []map[string]interface{}{}
  53. }
  54. dataArr = append(dataArr,tmp)
  55. dataAllDict[subtype] = dataArr
  56. tmp = make(map[string]interface{})
  57. }
  58. log.Println("类别组划分完毕:",len(dataAllDict),"组","~","需要判重:",isok,"条")
  59. pool := make(chan bool, threadNum)
  60. wg := &sync.WaitGroup{}
  61. for _,dataArr := range dataAllDict {
  62. pool <- true
  63. wg.Add(1)
  64. go func(dataArr []map[string]interface{}) {
  65. defer func() {
  66. <-pool
  67. wg.Done()
  68. }()
  69. num := 0
  70. for _,tmp := range dataArr{
  71. info := NewInfo(tmp)
  72. b, source, _ := DM.check(info)
  73. if b {
  74. num++
  75. var updateID = map[string]interface{}{} //记录更新判重的
  76. updateID["_id"] = StringTOBsonId(info.id)
  77. repeat_ids:=source.repeat_ids
  78. repeat_ids = append(repeat_ids,info.id)
  79. source.repeat_ids = repeat_ids
  80. DM.replacePoolData(source)//替换数据池-更新
  81. //Update.updatePool <- []map[string]interface{}{//重复数据打标签
  82. // updateID,
  83. // map[string]interface{}{
  84. // "$set": map[string]interface{}{
  85. // "repeat": 1,
  86. // "repeat_reason": reason,
  87. // "repeat_id": source.id,
  88. // "dataging": 0,
  89. // "updatetime_repeat" :util.Int64All(time.Now().Unix()),
  90. // },
  91. // },
  92. //}
  93. }
  94. }
  95. numberlock.Lock()
  96. repeatN+=num
  97. numberlock.Unlock()
  98. }(dataArr)
  99. }
  100. wg.Wait()
  101. log.Println("this full data is over.", n, "repeateN:", repeatN)
  102. time.Sleep(15 * time.Second)
  103. }