|
@@ -0,0 +1,124 @@
|
|
|
+package main
|
|
|
+
|
|
|
+import (
|
|
|
+ "encoding/json"
|
|
|
+ "fmt"
|
|
|
+ "log"
|
|
|
+ "mongodb"
|
|
|
+ "net/http"
|
|
|
+ "qfw/util"
|
|
|
+ "qfw/util/elastic"
|
|
|
+ "strings"
|
|
|
+ "time"
|
|
|
+
|
|
|
+ "github.com/robfig/cron"
|
|
|
+ "go.mongodb.org/mongo-driver/bson"
|
|
|
+)
|
|
|
+
|
|
|
+//定时任务,去统计bidding索引,排查问题、预警
|
|
|
+
|
|
|
+type T struct {
|
|
|
+ Cron string
|
|
|
+ Name string
|
|
|
+ Min int
|
|
|
+ Max int
|
|
|
+ Type string
|
|
|
+ Tjscope string
|
|
|
+ Mgo string
|
|
|
+}
|
|
|
+
|
|
|
+//es数据最报警
|
|
|
+var (
|
|
|
+ config map[string]interface{}
|
|
|
+ to string
|
|
|
+ api string
|
|
|
+ esAddr, esDataAddr string
|
|
|
+ esIndex, esDataIndex string
|
|
|
+ Ts = []*T{}
|
|
|
+ esQ = `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
|
|
|
+)
|
|
|
+
|
|
|
+func init() {
|
|
|
+ util.ReadConfig(&config)
|
|
|
+ jkmail := config["jkmail"].(map[string]interface{})
|
|
|
+ to, _ = jkmail["to"].(string)
|
|
|
+ api, _ = jkmail["api"].(string)
|
|
|
+ esAddr, _ = config["esAddr"].(string)
|
|
|
+ esIndex, _ = config["esIndex"].(string)
|
|
|
+ esDataAddr, _ = config["esDataAddr"].(string)
|
|
|
+ esDataIndex, _ = config["esDataIndex"].(string)
|
|
|
+ tasks, _ := config["task"].([]interface{})
|
|
|
+ for _, t := range tasks {
|
|
|
+ bs, _ := json.Marshal(t)
|
|
|
+ var v *T
|
|
|
+ json.Unmarshal(bs, &v)
|
|
|
+ if v != nil {
|
|
|
+ Ts = append(Ts, v)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func main() {
|
|
|
+ log.Println("start..")
|
|
|
+ if len(Ts) > 0 {
|
|
|
+ c := cron.New()
|
|
|
+ for _, v := range Ts {
|
|
|
+ c.AddFunc(v.Cron, v.task)
|
|
|
+ }
|
|
|
+ c.Start()
|
|
|
+ defer c.Stop()
|
|
|
+ select {}
|
|
|
+ }
|
|
|
+ log.Println("end..")
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func (t *T) task() {
|
|
|
+ //初始化语句
|
|
|
+ qt := strings.Split(t.Tjscope, ",")
|
|
|
+ if len(qt) != 2 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ eq := ""
|
|
|
+ st, et := int64(0), int64(0)
|
|
|
+ now := time.Now()
|
|
|
+ switch qt[1] {
|
|
|
+ case "h":
|
|
|
+ et = now.Unix()
|
|
|
+ st = et + util.Int64All(qt[0])*3600
|
|
|
+ case "d":
|
|
|
+ st = util.GetDayStartSecond(util.IntAll(qt[0]))
|
|
|
+ et = util.GetDayStartSecond(0)
|
|
|
+ }
|
|
|
+ st1 := fmt.Sprintf("%x0000000000000000", st)
|
|
|
+ et1 := fmt.Sprintf("%x0000000000000000", et)
|
|
|
+ eq = fmt.Sprintf(esQ, st1, et1)
|
|
|
+ es := elastic.Elastic{S_esurl: esAddr, I_size: 1}
|
|
|
+ es.InitElasticSize()
|
|
|
+ count := int(es.Count(esIndex, esIndex, eq))
|
|
|
+ switch t.Type {
|
|
|
+ case "alert":
|
|
|
+ if count < t.Min || count > t.Max {
|
|
|
+ //report := fmt.Sprintf("告警%s,最小%d,最大%d,统计结果:%d", t.Name, t.Min, t.Max, count)
|
|
|
+ //t.SendMail(report)
|
|
|
+ }
|
|
|
+ case "report":
|
|
|
+ report := fmt.Sprintf("报告%s,统计结果%d", t.Name, count)
|
|
|
+ if len(t.Mgo) > 5 {
|
|
|
+ fs := strings.Split(t.Mgo, "|")
|
|
|
+ fmgo := mongodb.NewMgoWithUser(fs[0], fs[3], fs[1], fs[2], 1)
|
|
|
+ id1 := mongodb.StringTOBsonId(st1)
|
|
|
+ id2 := mongodb.StringTOBsonId(et1)
|
|
|
+ mq := bson.M{"extracttype": bson.M{"$ne": -1}, "sensitive": bson.M{"$ne": "测试"}, "dataging": bson.M{"$ne": 1}, "_id": bson.M{"$gte": id1, "$lt": id2}}
|
|
|
+ count2 := fmgo.Count(fs[4], mq)
|
|
|
+ count3 := fmgo.Count(fs[4], bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}}) //mgo总入库量
|
|
|
+ report += ",mgo统计" + fmt.Sprint(count2) + ",差值:" + fmt.Sprint(count2-count) + ",mgo总入库量" + fmt.Sprint(count3)
|
|
|
+ }
|
|
|
+ t.SendMail(report)
|
|
|
+ }
|
|
|
+ log.Println("task over:", t.Name, eq, count)
|
|
|
+}
|
|
|
+
|
|
|
+func (t *T) SendMail(report string) {
|
|
|
+ http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, t.Name, report))
|
|
|
+}
|