main.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. package main
  2. import (
  3. "log"
  4. "strings"
  5. "time"
  6. "mongodb"
  7. common "qfw/util"
  8. "github.com/robfig/cron"
  9. )
  10. var (
  11. Mgo *mongodb.MongodbSim
  12. BzMgo *mongodb.MongodbSim
  13. cfg = new(Config)
  14. )
  15. func init() {
  16. common.ReadConfig(&cfg)
  17. Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize)
  18. BzMgo = mongodb.NewMgo(cfg.Bz.Address, cfg.Bz.DbName, cfg.Bz.DbSize)
  19. }
  20. func runJob() {
  21. log.Println("中国联通数据迁移任务开始------")
  22. log.Println("Cfg: ", cfg)
  23. query, count, session := map[string]interface{}{"appid": "jyGQ1XQQsEAwNeSENOFR9D"}, 0, Mgo.GetMgoConn()
  24. defer func() {
  25. Mgo.DestoryMongoConn(session)
  26. }()
  27. iter := session.DB(cfg.Db.DbName).C(cfg.Db.ColName).Find(&query).Sort("_id").Iter()
  28. thisData := map[string]interface{}{}
  29. for {
  30. if !iter.Next(&thisData) {
  31. break
  32. }
  33. count++
  34. log.Println("第", count, "条")
  35. id := mongodb.BsonIdToSId(thisData["_id"])
  36. area := common.ObjToString(thisData["area"])
  37. buyer := common.ObjToString(thisData["buyer"])
  38. // s_winner := common.ObjToString(thisData["s_winner"])
  39. toptype := common.ObjToString(thisData["toptype"])
  40. thisData["createtime"] = time.Now().Unix()
  41. if toptype == "招标" || toptype == "预告" {
  42. if area != "全国" && area != "" && buyer != "" && !strings.Contains(buyer, "本级") && !strings.Contains(buyer, "本部") && !strings.Contains(buyer, "机关") {
  43. saveId := Mgo.Save("usermail", thisData)
  44. if saveId != "" {
  45. log.Println("数据保存usermail成功", id, saveId)
  46. delC := Mgo.Delete(cfg.Db.ColName, map[string]interface{}{"_id": mongodb.StringTOBsonId(id)})
  47. if delC > 0 {
  48. log.Println("数据从临时表删除成功", delC, id, saveId)
  49. } else {
  50. log.Println("数据从临时表删除失败!!!!", id, saveId)
  51. }
  52. } else {
  53. log.Println("数据保存usermail失败!!!!", id)
  54. }
  55. } else {
  56. baseInfoMap := map[string]interface{}{}
  57. baseInfoMap["id"] = id
  58. baseInfoMap["v_baseinfo"] = thisData
  59. baseInfoMap["b_isprchasing"] = true
  60. baseInfoMap["b_istagging"] = true
  61. baseInfoMap["i_createtime"] = time.Now().Unix()
  62. baseInfoMap["b_isgivegroup"] = false //是否分配给用户组
  63. baseInfoMap["b_istag"] = false //是否已标注
  64. baseInfoMap["b_isgiveuser"] = false //是否分配给用户
  65. baseInfoMap["b_check"] = false // 质检标记
  66. baseInfoMap["b_isEff"] = false // 标的物有效性
  67. saveId := BzMgo.Save(cfg.Bz.ColName, baseInfoMap)
  68. if saveId != "" {
  69. log.Println("数据保存数据标注表成功", id, saveId)
  70. //清洗同时也先存usermail推送
  71. Mgo.Save("usermail", thisData)
  72. //
  73. delC := Mgo.Delete(cfg.Db.ColName, map[string]interface{}{"_id": mongodb.StringTOBsonId(id)})
  74. if delC > 0 {
  75. log.Println("数据从临时表删除成功", delC, id, saveId)
  76. } else {
  77. log.Println("数据从临时表删除失败!!!!", id, saveId)
  78. }
  79. } else {
  80. log.Println("数据保存数据标注表失败!!!!", id)
  81. }
  82. }
  83. } else {
  84. saveId := Mgo.Save("usermail", thisData)
  85. if saveId != "" {
  86. log.Println("数据保存usermail成功", id, saveId)
  87. delC := Mgo.Delete(cfg.Db.ColName, map[string]interface{}{"_id": mongodb.StringTOBsonId(id)})
  88. if delC > 0 {
  89. log.Println("数据从临时表删除成功", delC, id, saveId)
  90. } else {
  91. log.Println("数据从临时表删除失败!!!!", id, saveId)
  92. }
  93. } else {
  94. log.Println("数据保存usermail失败!!!!", id)
  95. }
  96. }
  97. thisData = map[string]interface{}{}
  98. }
  99. log.Println("中国联通数据迁移任务结束------")
  100. }
  101. func main() {
  102. runJob()
  103. c := cron.New()
  104. c.AddFunc(cfg.CornExp, func() {
  105. runJob()
  106. })
  107. c.Start()
  108. select {}
  109. }