main.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. elastic "es"
  7. "flag"
  8. "mongodb"
  9. common "qfw/util"
  10. "github.com/robfig/cron"
  11. )
  12. var (
  13. Mgo *mongodb.MongodbSim
  14. Es elastic.Es
  15. cfg = new(Config)
  16. SEXhs = common.SimpleEncrypt{Key: "topJYBX2019"}
  17. mode = flag.Int("m", 1, "")
  18. )
  19. func init() {
  20. common.ReadConfig(&cfg)
  21. Mgo = mongodb.NewMgo(cfg.Db.Address, cfg.Db.DbName, cfg.Db.DbSize)
  22. Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
  23. }
  24. func FindData() {
  25. session := Mgo.GetMgoConn()
  26. query := map[string]interface{}{
  27. "createtime": map[string]interface{}{
  28. "$gte": time.Now().Unix() - 86400,
  29. },
  30. "$or": []map[string]interface{}{
  31. map[string]interface{}{
  32. "projectId": "",
  33. },
  34. map[string]interface{}{
  35. "projectId": map[string]interface{}{
  36. "$exists": 0,
  37. },
  38. },
  39. }}
  40. defer Mgo.DestoryMongoConn(session)
  41. iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Iter()
  42. thisData, count := map[string]interface{}{}, 0
  43. for {
  44. if !iter.Next(&thisData) {
  45. break
  46. }
  47. count++
  48. id := common.ObjToString(thisData["id"])
  49. query := `{"query": {"bool": {"must": [{"term": {"ids": "%s"}}],"must_not": [],"should": []}}}`
  50. querys := fmt.Sprintf(query, id)
  51. projectId := ""
  52. data := Es.Get("projectset", "projectset", querys)
  53. if data != nil && *data != nil && len(*data) > 0 {
  54. projectId = common.ObjToString((*data)[0]["_id"])
  55. log.Println("projectId", projectId, count)
  56. } else {
  57. log.Println("ES未查到项目id", id, count)
  58. }
  59. //
  60. if projectId != "" {
  61. ok := Mgo.UpdateById("usermail", thisData["_id"], map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}})
  62. if ok {
  63. log.Println("项目id更新成功 ", id, projectId)
  64. } else {
  65. log.Println("项目id更新失败! ", id, projectId)
  66. }
  67. }
  68. thisData = map[string]interface{}{}
  69. }
  70. }
  71. func main() {
  72. flag.Parse()
  73. if *mode == 2 {
  74. FindHistory()
  75. } else {
  76. FindData()
  77. c := cron.New()
  78. c.AddFunc(cfg.CornExp, func() {
  79. FindData()
  80. })
  81. c.Start()
  82. select {}
  83. }
  84. }
  85. func FindHistory() {
  86. session := Mgo.GetMgoConn()
  87. query := map[string]interface{}{
  88. "$or": []map[string]interface{}{
  89. map[string]interface{}{
  90. "projectId": "",
  91. },
  92. map[string]interface{}{
  93. "projectId": map[string]interface{}{
  94. "$exists": 0,
  95. },
  96. },
  97. }}
  98. defer Mgo.DestoryMongoConn(session)
  99. iter := session.DB(cfg.Db.DbName).C("usermail").Find(&query).Iter()
  100. thisData := map[string]interface{}{}
  101. for {
  102. if !iter.Next(&thisData) {
  103. break
  104. }
  105. id := common.ObjToString(thisData["id"])
  106. query := `{"query": {"bool": {"must": [{"term": {"ids": "%s"}}],"must_not": [],"should": []}}}`
  107. querys := fmt.Sprintf(query, id)
  108. projectId := ""
  109. data := Es.Get("projectset", "projectset", querys)
  110. if data != nil && *data != nil && len(*data) > 0 {
  111. projectId = common.ObjToString((*data)[0]["_id"])
  112. log.Println("projectId", projectId)
  113. } else {
  114. log.Println("ES未查到项目id", id)
  115. }
  116. //
  117. if projectId != "" {
  118. ok := Mgo.UpdateById("usermail", thisData["_id"], map[string]interface{}{"$set": map[string]interface{}{"projectId": projectId}})
  119. if ok {
  120. log.Println("项目id更新成功 ", id, projectId)
  121. } else {
  122. log.Println("项目id更新失败! ", id, projectId)
  123. }
  124. }
  125. thisData = map[string]interface{}{}
  126. }
  127. }