main.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "github.com/robfig/cron"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "net/http"
  12. "time"
  13. )
  14. var (
  15. MgoBid *mongodb.MongodbSim
  16. lastId, startId string
  17. mail_to = "wangjianghan@topnet.net.cn,zhangjinkun@topnet.net.cn,maxiaoshan@topnet.net.cn,zhengkun@topnet.net.cn"
  18. mail_api = "http://172.17.145.179:19281/_send/_mail"
  19. skip = 0
  20. MgoBulkSize = 200
  21. )
  22. func init() {
  23. MgoBid = &mongodb.MongodbSim{
  24. MongodbAddr: "172.17.189.140:27080,172.17.189.141:27081",
  25. DbName: "qfw",
  26. Size: 5,
  27. UserName: "SJZY_RWESBid_Other",
  28. Password: "SJZY@O17t8herB3B",
  29. }
  30. MgoBid.InitPool()
  31. log.InitLog()
  32. }
  33. func main() {
  34. flag.StringVar(&startId, "gtid", "", "开始id")
  35. flag.Parse()
  36. if startId != "" {
  37. TimeTask()
  38. } else {
  39. flag.PrintDefaults()
  40. }
  41. }
  42. func TimeTask() {
  43. crn := cron.New()
  44. //cronstr := "0 */1 * * * ?" // 每2min执行一次
  45. cronstr := "*/20 * * * * ?" // 每20s执行一次
  46. ct := 0
  47. _ = crn.AddFunc(cronstr, func() {
  48. ct += 1
  49. util.Debug(fmt.Sprintf("task count: %d", ct))
  50. taskinfo()
  51. })
  52. crn.Start()
  53. c := make(chan bool, 1)
  54. <-c
  55. }
  56. func taskinfo() {
  57. //currentTime := time.Now()
  58. //m, _ := time.ParseDuration("5m") // 5分钟之前
  59. //rtime := currentTime.Add(m)
  60. //eid := primitive.NewObjectIDFromTimestamp(rtime)
  61. //qfid := bson.M{"_id": bson.M{"$lte": eid}}
  62. info, _ := MgoBid.Find("bidding", nil, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1)
  63. if len(*info) > 0 && lastId != mongodb.BsonIdToSId((*info)[0]["_id"]) {
  64. lastId = mongodb.BsonIdToSId((*info)[0]["_id"])
  65. } else {
  66. if skip >= 30 {
  67. sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", lastId))
  68. skip = 0
  69. } else {
  70. skip++
  71. }
  72. log.Debug(fmt.Sprintf("skip: %d, lastid: %s", skip, lastId))
  73. return
  74. }
  75. skip = 0
  76. info1, _ := MgoBid.Find("bidding_processing_ids", `{"dataprocess": 0}`, `{"_id": -1}`, nil, false, -1, 2)
  77. if len(*info1) > 1 {
  78. startId = util.ObjToString((*info1)[0]["gtid"])
  79. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
  80. count := MgoBid.Count("bidding", q)
  81. if count > 10000 {
  82. startId = util.ObjToString((*info1)[0]["lteid"])
  83. q = bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
  84. util.Debug(q)
  85. count = MgoBid.Count("bidding", q)
  86. save := make(map[string]interface{})
  87. save["gtid"] = startId
  88. save["lteid"] = lastId
  89. save["count"] = count
  90. now := time.Now().Unix()
  91. save["dataprocess"] = 0
  92. save["createtime"] = now
  93. save["updatetime"] = now
  94. saveFuc(save, util.Int64All((*info1)[0]["last_autoid"]))
  95. startId = lastId
  96. } else {
  97. update := make(map[string]interface{})
  98. update["lteid"] = lastId
  99. startId = util.ObjToString((*info1)[0]["gtid"])
  100. update["count"] = count
  101. update["updatetime"] = time.Now().Unix()
  102. updateFuc((*info1)[0]["_id"], update, util.Int64All((*info1)[0]["last_autoid"]), util.ObjToString((*info1)[0]["lteid"]))
  103. startId = lastId
  104. }
  105. } else {
  106. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}}
  107. count := MgoBid.Count("bidding", q)
  108. info2, _ := MgoBid.Find("bidding_processing_ids", nil, `{"_id": -1}`, nil, true, -1, 1)
  109. save := make(map[string]interface{})
  110. save["gtid"] = startId
  111. save["lteid"] = lastId
  112. save["count"] = count
  113. now := time.Now().Unix()
  114. save["dataprocess"] = 0
  115. save["createtime"] = now
  116. save["updatetime"] = now
  117. saveFuc(save, util.Int64All((*info2)[0]["last_autoid"]))
  118. startId = lastId
  119. }
  120. }
  121. func saveFuc(save map[string]interface{}, autoid int64) {
  122. conn := MgoBid.GetMgoConn()
  123. defer MgoBid.DestoryMongoConn(conn)
  124. gtid := util.ObjToString(save["gtid"])
  125. lteid := util.ObjToString(save["lteid"])
  126. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}}
  127. result := conn.DB(MgoBid.DbName).C("bidding").Find(q).Select(bson.M{"_id": 1}).Iter()
  128. var bidUpdate [][]map[string]interface{}
  129. for tmp := make(map[string]interface{}); result.Next(tmp); {
  130. autoid++
  131. bidUpdate = append(bidUpdate, []map[string]interface{}{{
  132. "_id": tmp["_id"],
  133. },
  134. {"$set": bson.M{"autoid": autoid}},
  135. })
  136. if len(bidUpdate) >= MgoBulkSize {
  137. tmps := bidUpdate
  138. MgoBid.UpdateBulk("bidding", tmps...)
  139. bidUpdate = [][]map[string]interface{}{}
  140. }
  141. }
  142. if len(bidUpdate) > 0 {
  143. tmps := bidUpdate
  144. MgoBid.UpdateBulk("bidding", tmps...)
  145. bidUpdate = [][]map[string]interface{}{}
  146. }
  147. save["last_autoid"] = autoid
  148. MgoBid.Save("bidding_processing_ids", save)
  149. }
  150. func updateFuc(id interface{}, update map[string]interface{}, autoid int64, gtid string) {
  151. conn := MgoBid.GetMgoConn()
  152. defer MgoBid.DestoryMongoConn(conn)
  153. lteid := util.ObjToString(update["lteid"])
  154. q := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(gtid), "$lte": mongodb.StringTOBsonId(lteid)}}
  155. result := conn.DB(MgoBid.DbName).C("bidding").Find(q).Select(bson.M{"_id": 1}).Iter()
  156. var bidUpdate [][]map[string]interface{}
  157. for tmp := make(map[string]interface{}); result.Next(tmp); {
  158. autoid++
  159. bidUpdate = append(bidUpdate, []map[string]interface{}{{
  160. "_id": tmp["_id"],
  161. },
  162. {"$set": bson.M{"autoid": autoid}},
  163. })
  164. if len(bidUpdate) >= MgoBulkSize {
  165. tmps := bidUpdate
  166. MgoBid.UpdateBulk("bidding", tmps...)
  167. bidUpdate = [][]map[string]interface{}{}
  168. }
  169. }
  170. if len(bidUpdate) > 0 {
  171. tmps := bidUpdate
  172. MgoBid.UpdateBulk("bidding", tmps...)
  173. bidUpdate = [][]map[string]interface{}{}
  174. }
  175. update["last_autoid"] = autoid
  176. MgoBid.UpdateById("bidding_processing_ids", id, map[string]interface{}{"$set": update})
  177. }
  178. func sendMail(content string) {
  179. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_to, "processing_ids-send-fail", content))
  180. if err == nil {
  181. defer res.Body.Close()
  182. read, _ := io.ReadAll(res.Body)
  183. util.Debug("send mail ..." + string(read))
  184. }
  185. }