main.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/robfig/cron"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "go.uber.org/zap"
  9. "io/ioutil"
  10. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  12. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  13. "net/http"
  14. "time"
  15. )
  16. var (
  17. MgoBid *mongodb.MongodbSim
  18. lastId, startId string
  19. mail_to = "zhangjinkun@topnet.net.cn,wangjianghan@topnet.net.cn,maxiaoshan@topnet.net.cn,zhengkun@topnet.net.cn"
  20. mail_api = "http://172.17.145.179:19281/_send/_mail"
  21. )
  22. func init() {
  23. MgoBid = &mongodb.MongodbSim{
  24. MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",
  25. DbName: "qfw",
  26. Size: 5,
  27. UserName: "SJZY_RWESBid_Other",
  28. Password: "SJZY@O17t8herB3B",
  29. }
  30. MgoBid.InitPool()
  31. }
  32. func main() {
  33. flag.StringVar(&startId, "gtid", "", "开始id")
  34. flag.Parse()
  35. if startId != "" {
  36. TimeTask()
  37. } else {
  38. flag.PrintDefaults()
  39. }
  40. }
  41. func TimeTask() {
  42. crn := cron.New()
  43. cronstr := "0 */5 * * * ?" // 每5min执行一次
  44. ct := 0
  45. _ = crn.AddFunc(cronstr, func() {
  46. ct += 1
  47. util.Debug(fmt.Sprintf("task count: %d", ct))
  48. taskinfo()
  49. })
  50. crn.Start()
  51. c := make(chan bool, 1)
  52. <-c
  53. }
  54. func taskinfo() {
  55. currentTime := time.Now()
  56. m, _ := time.ParseDuration("-5m") // 5分钟之前
  57. rtime := currentTime.Add(m)
  58. eid := primitive.NewObjectIDFromTimestamp(rtime)
  59. qfid := bson.M{"_id": bson.M{"$lte": eid}}
  60. info, _ := MgoBid.Find("bidding", qfid, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1)
  61. if info != nil {
  62. lastId = mongodb.BsonIdToSId((*info)[0]["_id"])
  63. } else {
  64. sendMail("bidding表id查询失败")
  65. return
  66. }
  67. info1, _ := MgoBid.Find("bidding_processing_ids", `{"dataprocess": 0}`, `{"_id": -1}`, nil, false, -1, 2)
  68. if len(*info1) > 1 {
  69. startId = util.ObjToString((*info1)[0]["gtid"])
  70. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
  71. count := MgoBid.Count("bidding", q)
  72. ids := fmt.Sprintf("%s-%s", startId, lastId)
  73. if count <= 0 {
  74. sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids))
  75. return
  76. }
  77. if count > 10000 {
  78. startId = util.ObjToString((*info1)[0]["lteid"])
  79. q = bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
  80. util.Debug(q)
  81. count = MgoBid.Count("bidding", q)
  82. save := make(map[string]interface{})
  83. save["gtid"] = startId
  84. save["lteid"] = lastId
  85. save["count"] = count
  86. now := time.Now().Unix()
  87. save["dataprocess"] = 0
  88. save["createtime"] = now
  89. save["updatetime"] = now
  90. MgoBid.Save("bidding_processing_ids", save)
  91. startId = lastId
  92. } else {
  93. update := make(map[string]interface{})
  94. update["lteid"] = lastId
  95. startId = util.ObjToString((*info1)[0]["gtid"])
  96. update["count"] = count
  97. update["updatetime"] = time.Now().Unix()
  98. MgoBid.UpdateById("bidding_processing_ids", (*info1)[0]["_id"], map[string]interface{}{"$set": update})
  99. startId = lastId
  100. }
  101. } else {
  102. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
  103. count := MgoBid.Count("bidding", q)
  104. ids := fmt.Sprintf("%s-%s", startId, lastId)
  105. if count <= 0 {
  106. sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids))
  107. return
  108. }
  109. save := make(map[string]interface{})
  110. save["gtid"] = startId
  111. save["lteid"] = lastId
  112. save["count"] = count
  113. now := time.Now().Unix()
  114. save["dataprocess"] = 0
  115. save["createtime"] = now
  116. save["updatetime"] = now
  117. MgoBid.Save("bidding_processing_ids", save)
  118. startId = lastId
  119. }
  120. }
  121. func sendMail(content string) {
  122. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_to, "processing_ids-send-fail", content))
  123. if err == nil {
  124. defer res.Body.Close()
  125. read, err := ioutil.ReadAll(res.Body)
  126. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  127. }
  128. }