main.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. elastic "es"
  6. "fmt"
  7. "io/ioutil"
  8. "log"
  9. "net/http"
  10. common "qfw/util"
  11. "github.com/robfig/cron"
  12. )
  13. var (
  14. Es elastic.Es
  15. cfg = new(Config)
  16. projectTime, biddingTime = 0, 0
  17. )
  18. func main() {
  19. common.ReadConfig(&cfg)
  20. log.Println("cfg ", cfg)
  21. Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
  22. runJob()
  23. c := cron.New()
  24. c.AddFunc(cfg.CornExp, func() {
  25. runJob()
  26. })
  27. c.Start()
  28. select {}
  29. }
  30. func runJob() {
  31. log.Println("增量数据查询开始")
  32. lastProjectTime, isOk := getData(cfg.LastProjectTime, cfg.Es.ProjectIndex)
  33. if isOk {
  34. cfg.LastProjectTime = lastProjectTime
  35. }
  36. lastBiddingTime, isOks := getData(cfg.LastBiddingTime, cfg.Es.BiddingIndex)
  37. if isOks {
  38. cfg.LastBiddingTime = lastBiddingTime
  39. }
  40. common.WriteSysConfig(cfg)
  41. log.Println("增量数据查询结束")
  42. }
  43. // Send("cbs告警:超过"+fmt.Sprint(SysConfig.TimeExpire)+"个小时未更新数据", WxKey)
  44. func getData(LastTime int64, index string) (int64, bool) {
  45. endTime, isOk := int64(0), true
  46. esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d"}}}}},"_source":["pici"],"sort":{"pici":"desc"},"from":0,"size":1}`
  47. idQuery := fmt.Sprintf(esquery, LastTime)
  48. res := Es.Get(index, index, idQuery)
  49. if res != nil && *res != nil && len(*res) == 1 {
  50. endTime = common.Int64All((*res)[0]["pici"])
  51. if index == cfg.Es.BiddingIndex {
  52. biddingTime = 0
  53. } else if index == cfg.Es.ProjectIndex {
  54. projectTime = 0
  55. }
  56. log.Println("本次任务查找到数据...", endTime)
  57. } else {
  58. endTime = LastTime
  59. isOk = false
  60. log.Println("本次任务未查找到数据...", idQuery)
  61. if index == cfg.Es.BiddingIndex {
  62. biddingTime += 2
  63. } else if index == cfg.Es.ProjectIndex {
  64. projectTime += 2
  65. }
  66. Send(index+fmt.Sprint(projectTime)+"个小时未更新数据", cfg.WxKey)
  67. }
  68. return endTime, isOk
  69. }
  70. func Send(msg, key string) {
  71. m := map[string]interface{}{
  72. "msgtype": "text",
  73. "text": map[string]string{
  74. "content": msg,
  75. },
  76. }
  77. b, _ := json.Marshal(m)
  78. res, err := http.Post(fmt.Sprintf(cfg.WxApi, key), "application/json", bytes.NewReader(b))
  79. if err != nil {
  80. log.Println("发送出错", err)
  81. } else {
  82. defer res.Body.Close()
  83. resByte, _ := ioutil.ReadAll(res.Body)
  84. log.Println("发送结果", string(resByte))
  85. }
  86. }