biddingall.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/spf13/viper"
  5. "go.uber.org/zap"
  6. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "sync"
  10. )
  11. //biddingAllData 处理芜湖存量bidding数据
  12. func biddingAllData() {
  13. type Biddingall struct {
  14. Coll string
  15. Gtid string
  16. Lteid string
  17. }
  18. type RoutinesConf struct {
  19. Num int
  20. }
  21. type AllConf struct {
  22. All map[string]Biddingall
  23. Routines RoutinesConf
  24. }
  25. var all AllConf
  26. viper.SetConfigFile("biddingall.toml")
  27. viper.SetConfigName("biddingall") // 配置文件名称(无扩展名)
  28. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  29. viper.AddConfigPath("./")
  30. err := viper.ReadInConfig() // 查找并读取配置文件
  31. if err != nil { // 处理读取配置文件的错误
  32. fmt.Println("ReadInConfig err =>", err)
  33. return
  34. }
  35. err = viper.Unmarshal(&all)
  36. if err != nil {
  37. fmt.Println("biddingAllDataTask Unmarshal err =>", err)
  38. return
  39. }
  40. for k, conf := range all.All {
  41. go dealData(conf.Coll, conf.Gtid, conf.Lteid, k, all.Routines.Num)
  42. }
  43. }
  44. func dealData(coll, gtid, lteid, kword string, routines int) {
  45. ch := make(chan bool, routines)
  46. wg := &sync.WaitGroup{}
  47. q := map[string]interface{}{
  48. "_id": map[string]interface{}{
  49. "$gt": mongodb.StringTOBsonId(gtid),
  50. "$lte": mongodb.StringTOBsonId(lteid),
  51. },
  52. }
  53. biddingConn := Mgo.GetMgoConn()
  54. it := biddingConn.DB("qfw").C(coll).Find(&q).Select(nil).Iter()
  55. c1, index := 0, 0
  56. var indexLock sync.Mutex
  57. for tmp := make(map[string]interface{}); it.Next(tmp); c1++ {
  58. if c1%20000 == 0 {
  59. log.Info(kword, zap.Int("current:", c1))
  60. log.Info(kword, zap.Any("current:_id =>", tmp["_id"]))
  61. }
  62. ch <- true
  63. wg.Add(1)
  64. go func(tmp map[string]interface{}) {
  65. defer func() {
  66. <-ch
  67. wg.Done()
  68. }()
  69. if sensitive := util.ObjToString(tmp["sensitive"]); sensitive == "测试" { //bidding中有敏感词,不生索引
  70. tmp = make(map[string]interface{})
  71. return
  72. }
  73. // 针对存量数据,重复数据不进索引
  74. if util.IntAll(tmp["extracttype"]) == -1 {
  75. return
  76. }
  77. //1.采购单位
  78. buyer := util.ObjToString(tmp["buyer"])
  79. rests := tree.Match(buyer, true)
  80. if len(rests) > 0 {
  81. indexLock.Lock()
  82. index++
  83. indexLock.Unlock()
  84. Mgo.SaveByOriID("bidding_wuhu_all", tmp)
  85. return
  86. }
  87. //2.中标单位
  88. winner := util.ObjToString(tmp["winner"])
  89. rests = tree.Match(winner, true)
  90. if len(rests) > 0 {
  91. indexLock.Lock()
  92. index++
  93. indexLock.Unlock()
  94. Mgo.SaveByOriID("bidding_wuhu_all", tmp)
  95. return
  96. }
  97. //3.中标候选人
  98. winnerorder, ok := tmp["winnerorder"].([]map[string]interface{})
  99. if ok {
  100. for _, v := range winnerorder {
  101. res := tree.Match(util.ObjToString(v["entname"]), true)
  102. if len(res) > 0 {
  103. Mgo.SaveByOriID("bidding_wuhu_all", tmp)
  104. return
  105. }
  106. }
  107. }
  108. }(tmp)
  109. tmp = map[string]interface{}{}
  110. }
  111. wg.Wait()
  112. log.Info(fmt.Sprintf("%s over", kword), zap.Int("count", c1), zap.Int("index", index))
  113. }