123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- package main
- import (
- "bytes"
- "encoding/json"
- elastic "es"
- "fmt"
- "io/ioutil"
- "log"
- "net/http"
- common "qfw/util"
- "github.com/robfig/cron"
- )
- var (
- Es elastic.Es
- cfg = new(Config)
- projectTime, biddingTime = 0, 0
- )
- func main() {
- common.ReadConfig(&cfg)
- log.Println("cfg ", cfg)
- Es = elastic.NewEs(cfg.Es.Version, cfg.Es.Address, cfg.Es.DbSize, cfg.Es.UserName, cfg.Es.Password)
- runJob()
- c := cron.New()
- c.AddFunc(cfg.CornExp, func() {
- runJob()
- })
- c.Start()
- select {}
- }
- func runJob() {
- log.Println("增量数据查询开始")
- lastProjectTime, isOk := getData(cfg.LastProjectTime, cfg.Es.ProjectIndex)
- if isOk {
- cfg.LastProjectTime = lastProjectTime
- }
- lastBiddingTime, isOks := getData(cfg.LastBiddingTime, cfg.Es.BiddingIndex)
- if isOks {
- cfg.LastBiddingTime = lastBiddingTime
- }
- common.WriteSysConfig(cfg)
- log.Println("增量数据查询结束")
- }
- // Send("cbs告警:超过"+fmt.Sprint(SysConfig.TimeExpire)+"个小时未更新数据", WxKey)
- func getData(LastTime int64, index string) (int64, bool) {
- endTime, isOk := int64(0), true
- esquery := `{"query":{"bool":{"must":{"range":{"pici":{"gte":"%d"}}}}},"_source":["pici"],"sort":{"pici":"desc"},"from":0,"size":1}`
- idQuery := fmt.Sprintf(esquery, LastTime)
- res := Es.Get(index, index, idQuery)
- if res != nil && *res != nil && len(*res) == 1 {
- endTime = common.Int64All((*res)[0]["pici"])
- if index == cfg.Es.BiddingIndex {
- biddingTime = 0
- } else if index == cfg.Es.ProjectIndex {
- projectTime = 0
- }
- log.Println("本次任务查找到数据...", endTime)
- } else {
- endTime = LastTime
- isOk = false
- log.Println("本次任务未查找到数据...", idQuery)
- if index == cfg.Es.BiddingIndex {
- biddingTime += 2
- } else if index == cfg.Es.ProjectIndex {
- projectTime += 2
- }
- Send(index+fmt.Sprint(projectTime)+"个小时未更新数据", cfg.WxKey)
- }
- return endTime, isOk
- }
- func Send(msg, key string) {
- m := map[string]interface{}{
- "msgtype": "text",
- "text": map[string]string{
- "content": msg,
- },
- }
- b, _ := json.Marshal(m)
- res, err := http.Post(fmt.Sprintf(cfg.WxApi, key), "application/json", bytes.NewReader(b))
- if err != nil {
- log.Println("发送出错", err)
- } else {
- defer res.Body.Close()
- resByte, _ := ioutil.ReadAll(res.Body)
- log.Println("发送结果", string(resByte))
- }
- }
|