main.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. package main
  2. import (
  3. "app.yhyue.com/data_processing/common_utils/log"
  4. "app.yhyue.com/data_processing/common_utils/mongodb"
  5. "fmt"
  6. "github.com/robfig/cron"
  7. "go.uber.org/zap"
  8. "io/ioutil"
  9. "net/http"
  10. "time"
  11. )
  12. var (
  13. MgoBid *mongodb.MongodbSim
  14. lastId, startId string
  15. mail_to = "zhangjinkun@topnet.net.cn,wangjianghan@topnet.net.cn,maxiaoshan@topnet.net.cn,zhengkun@topnet.net.cn"
  16. mail_api = "http://172.17.145.179:19281/_send/_mail"
  17. )
  18. func init() {
  19. MgoBid = &mongodb.MongodbSim{
  20. MongodbAddr: "172.17.4.187:27082,172.17.145.163:27083",
  21. DbName: "qfw",
  22. Size: 5,
  23. UserName: "SJZY_RWESBid_Other",
  24. Password: "SJZY@O17t8herB3B",
  25. }
  26. MgoBid.InitPool()
  27. startId = ""
  28. }
  29. func main() {
  30. TimeTask()
  31. }
  32. func TimeTask() {
  33. c := cron.New()
  34. cronstr := "0 */5 * * * ?" // 每5min执行一次
  35. _ = c.AddFunc(cronstr, func() {
  36. taskinfo()
  37. })
  38. c.Start()
  39. }
  40. func taskinfo() {
  41. info, _ := MgoBid.Find("bidding", nil, `{"_id": -1}`, `{"_id": 1}`, true, -1, -1)
  42. if info != nil {
  43. lastId = mongodb.BsonIdToSId((*info)[0]["_id"])
  44. } else {
  45. sendMail("bidding表id查询失败")
  46. return
  47. }
  48. q := map[string]interface{}{"$gt": mongodb.StringTOBsonId(startId), "$lte": mongodb.StringTOBsonId(lastId)}
  49. count := MgoBid.Count("bidding", q)
  50. ids := fmt.Sprintf("%s-%s", startId, lastId)
  51. if count <= 0 {
  52. sendMail(fmt.Sprintf("bidding表id段数据查询失败,%s", ids))
  53. return
  54. }
  55. save := make(map[string]interface{})
  56. save["gtid"] = startId
  57. save["lteid"] = lastId
  58. save["count"] = count
  59. now := time.Now().Unix()
  60. save["dataprocess"] = 0
  61. save["createtime"] = now
  62. save["updatetime"] = now
  63. MgoBid.Save("bidding_processing_ids", save)
  64. }
  65. func sendMail(content string) {
  66. res, err := http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", mail_api, mail_api, "processing_ids-send-fail", content))
  67. if err == nil {
  68. defer res.Body.Close()
  69. read, err := ioutil.ReadAll(res.Body)
  70. log.Info("send mail ...", zap.String("r:", string(read)), zap.Any("err:", err))
  71. }
  72. }