main.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/spf13/viper"
  5. "go.uber.org/zap"
  6. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  7. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  9. "strings"
  10. "sync"
  11. )
  12. var (
  13. GF GlobalConf
  14. Mgo *mongodb.MongodbSim //读取公司名录 MongoDB,也是更新的链接地址
  15. MgoB *mongodb.MongodbSim //读取公司名录 MongoDB,也是更新的链接地址
  16. MgoM *mongodb.MongodbSim //86 marked 表
  17. )
  18. func InitConfig() (err error) {
  19. viper.SetConfigFile("config.toml") // 指定配置文件路径
  20. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  21. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  22. viper.AddConfigPath("./")
  23. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  24. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  25. err = viper.ReadInConfig() // 查找并读取配置文件
  26. if err != nil { // 处理读取配置文件的错误
  27. return
  28. }
  29. err = viper.Unmarshal(&GF)
  30. return err
  31. }
  32. func InitLog() {
  33. err := log.InitLog(
  34. //log.Path("./logs/log.out"),
  35. log.Path(""),
  36. log.Level("info"),
  37. log.Compress(true),
  38. log.MaxSize(10),
  39. log.MaxBackups(10),
  40. log.MaxAge(7),
  41. log.Format("json"),
  42. )
  43. if err != nil {
  44. fmt.Printf("InitLog failed: %v\n", err)
  45. }
  46. }
  47. func InitMgo() {
  48. Mgo = &mongodb.MongodbSim{
  49. MongodbAddr: GF.Mongo.Host,
  50. DbName: GF.Mongo.DB,
  51. Size: GF.Mongo.Size,
  52. UserName: GF.Mongo.Username,
  53. Password: GF.Mongo.Password,
  54. Direct: GF.Mongo.Direct,
  55. }
  56. Mgo.InitPool()
  57. MgoB = &mongodb.MongodbSim{
  58. MongodbAddr: GF.Mongo.Host,
  59. DbName: "qfw",
  60. Size: GF.Mongo.Size,
  61. UserName: GF.Mongo.Username,
  62. Password: GF.Mongo.Password,
  63. Direct: GF.Mongo.Direct,
  64. }
  65. MgoB.InitPool()
  66. MgoM = &mongodb.MongodbSim{
  67. MongodbAddr: GF.MongoM.Host,
  68. DbName: GF.MongoM.DB,
  69. Size: GF.MongoM.Size,
  70. UserName: GF.MongoM.Username,
  71. Password: GF.MongoM.Password,
  72. Direct: GF.MongoM.Direct,
  73. }
  74. MgoM.InitPool()
  75. }
  76. func main() {
  77. err := InitConfig()
  78. if err != nil {
  79. panic(err)
  80. }
  81. InitLog()
  82. InitMgo()
  83. deal()
  84. fmt.Println("over")
  85. }
  86. func deal() {
  87. sess := Mgo.GetMgoConn()
  88. defer Mgo.DestoryMongoConn(sess)
  89. count := 0
  90. query := sess.DB(GF.Mongo.DB).C("bidding").Find(nil).Select(nil).Iter()
  91. ch := make(chan bool, 15)
  92. wg := &sync.WaitGroup{}
  93. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  94. if count%1000 == 0 {
  95. fmt.Println("current", count)
  96. }
  97. ch <- true
  98. wg.Add(1)
  99. go func(tmp map[string]interface{}) {
  100. defer func() {
  101. <-ch
  102. wg.Done()
  103. }()
  104. //
  105. biddingId := ""
  106. if id, ok := tmp["id"]; ok {
  107. marked, _ := MgoB.FindById("bidding", util.ObjToString(id), nil)
  108. if len(*marked) == 0 {
  109. return
  110. }
  111. biddingId = util.ObjToString(id)
  112. } else {
  113. marked, _ := MgoB.FindById("bidding", mongodb.BsonIdToSId(tmp["_id"]), nil)
  114. if len(*marked) == 0 {
  115. return
  116. }
  117. biddingId = mongodb.BsonIdToSId(tmp["_id"])
  118. }
  119. delete(tmp, "_id")
  120. tmp["_id"] = mongodb.StringTOBsonId(biddingId)
  121. fields := strings.Split(GF.Env.NoFields, ",")
  122. for _, v := range fields {
  123. delete(tmp, v)
  124. }
  125. err := Mgo.InsertOrUpdate("qfw_high", "wcc_bidding", tmp)
  126. if err != nil {
  127. log.Info("deal", zap.String("失败", biddingId))
  128. }
  129. }(tmp)
  130. tmp = map[string]interface{}{}
  131. }
  132. wg.Wait()
  133. fmt.Println("数据处理完毕")
  134. }