main.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package main
  2. import (
  3. "github.com/spf13/viper"
  4. //"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  5. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  6. "log"
  7. )
  8. var (
  9. GF GlobalConf
  10. MgoFm *mongodb.MongodbSim // 84
  11. MgoTo *mongodb.MongodbSim // 166
  12. MgoB *mongodb.MongodbSim //bidding 标讯数据库
  13. )
  14. func InitConfig() (err error) {
  15. viper.SetConfigFile("config.toml") // 指定配置文件路径
  16. viper.SetConfigName("config") // 配置文件名称(无扩展名)
  17. viper.SetConfigType("toml") // 如果配置文件的名称中没有扩展名,则需要配置此项
  18. viper.AddConfigPath("./")
  19. viper.AddConfigPath("./conf/") // 还可以在工作目录中查找配置
  20. viper.AddConfigPath("../conf/") // 还可以在工作目录中查找配置
  21. err = viper.ReadInConfig() // 查找并读取配置文件
  22. if err != nil { // 处理读取配置文件的错误
  23. return
  24. }
  25. err = viper.Unmarshal(&GF)
  26. return err
  27. }
  28. //func InitLog() {
  29. // err := log.InitLog(
  30. // //log.Path("./logs/log.out"),
  31. // log.Path(""),
  32. // log.Level("info"),
  33. // log.Compress(true),
  34. // log.MaxSize(10),
  35. // log.MaxBackups(10),
  36. // log.MaxAge(7),
  37. // log.Format("json"),
  38. // )
  39. // if err != nil {
  40. // fmt.Printf("InitLog failed: %v\n", err)
  41. // }
  42. //}
  43. func InitMgo() {
  44. MgoB = &mongodb.MongodbSim{
  45. MongodbAddr: GF.MongoB.Host,
  46. DbName: GF.MongoB.DB,
  47. Size: GF.MongoB.Size,
  48. UserName: GF.MongoB.Username,
  49. Password: GF.MongoB.Password,
  50. Direct: GF.MongoB.Direct,
  51. }
  52. MgoB.InitPool()
  53. MgoFm = &mongodb.MongodbSim{
  54. MongodbAddr: GF.MongoF.Host,
  55. DbName: GF.MongoF.DB,
  56. Size: GF.MongoF.Size,
  57. UserName: GF.MongoF.Username,
  58. Password: GF.MongoF.Password,
  59. Direct: GF.MongoF.Direct,
  60. }
  61. MgoFm.InitPool()
  62. MgoTo = &mongodb.MongodbSim{
  63. MongodbAddr: GF.MongoT.Host,
  64. DbName: GF.MongoT.DB,
  65. Size: GF.MongoT.Size,
  66. UserName: GF.MongoT.Username,
  67. Password: GF.MongoT.Password,
  68. Direct: GF.MongoT.Direct,
  69. }
  70. MgoTo.InitPool()
  71. }
  72. func main() {
  73. InitConfig()
  74. //InitLog()
  75. InitMgo()
  76. //f := GF
  77. //fmt.Println(f)
  78. dealData()
  79. log.Println("数据处理完毕")
  80. }
  81. func dealData() {
  82. sess := MgoFm.GetMgoConn()
  83. defer MgoFm.DestoryMongoConn(sess)
  84. where := map[string]interface{}{}
  85. if GF.Env.Appid != "" {
  86. where["appid"] = GF.Env.Appid
  87. }
  88. if GF.Env.Createtime > 0 {
  89. where["createtime"] = map[string]interface{}{
  90. "$gt": GF.Env.Createtime,
  91. }
  92. }
  93. query := sess.DB(GF.MongoF.DB).C(GF.MongoF.Coll).Find(where).Select(nil).Iter()
  94. count := 0
  95. //fields := []string{"area", "city", "projectname", "projectcode", "budget", "bidamount", "buyer", "s_winner"}
  96. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  97. if count%100 == 0 {
  98. log.Println("current ---", count)
  99. }
  100. biddingId := mongodb.BsonIdToSId(tmp["_id"])
  101. biddingH, _ := MgoB.FindById("bidding", biddingId, nil)
  102. if biddingH != nil && len(*biddingH) > 0 {
  103. tmp["is_high"] = true
  104. for _, field := range GF.Env.Fields {
  105. if val, ok := (*biddingH)[field]; ok && val != nil {
  106. tmp[field] = val
  107. }
  108. }
  109. MgoTo.SaveByOriID(GF.MongoT.Coll, tmp)
  110. } else {
  111. tmp["is_high"] = false
  112. MgoTo.SaveByOriID(GF.MongoT.Coll, tmp)
  113. }
  114. }
  115. }