package main import ( util "app.yhyue.com/data_processing/common_utils" "app.yhyue.com/data_processing/common_utils/elastic" "app.yhyue.com/data_processing/common_utils/mongodb" "encoding/json" "fmt" es "github.com/olivere/elastic/v7" "github.com/robfig/cron/v3" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "log" "net/http" "strconv" "strings" "time" ) //定时任务,去统计bidding索引,排查问题、预警 type T struct { Cron string Name string Min int Max int Type string Tjscope string Mgo string } //es数据最报警 var ( config map[string]interface{} to string api string esAddr, esAddr2, esAddr3 string esIndex, esIndex2, esIndex3 string username, password string username3, password3 string Ts = []*T{} esQ = `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}` esQ1 = `{"query": {"bool": {"must": [{"range": {"id": {"gte": "%s","lt": "%s"}}},{"terms": {"bidding.site": ["元博网(采购与招标网)","中国招标与采购网"]}}]}}}` ) func init() { util.ReadConfig(&config) jkmail := config["jkmail"].(map[string]interface{}) to, _ = jkmail["to"].(string) api, _ = jkmail["api"].(string) //esAddr, _ = config["esAddr"].(string) esIndex, _ = config["esIndex"].(string) esAddr2, _ = config["esAddr2"].(string) esIndex2, _ = config["esIndex2"].(string) // if _, ok := config["esAddr3"]; ok { esAddr3, _ = config["esAddr3"].(string) esIndex3, _ = config["esIndex3"].(string) // username3, _ = config["username3"].(string) password3, _ = config["password3"].(string) } username, _ = config["username"].(string) password, _ = config["password"].(string) tasks, _ := config["task"].([]interface{}) for _, t := range tasks { bs, _ := json.Marshal(t) var v *T json.Unmarshal(bs, &v) if v != nil { Ts = append(Ts, v) } } } func main() { log.Println("start..") if len(Ts) > 0 { local, _ := time.LoadLocation("Asia/Shanghai") c := cron.New(cron.WithLocation(local), cron.WithSeconds()) for _, v := range Ts { c.AddFunc(v.Cron, v.task) } c.Start() defer c.Stop() select {} } log.Println("end..") } func (t *T) task() { //初始化语句 qt := strings.Split(t.Tjscope, ",") if len(qt) != 2 { return } st, et := int64(0), int64(0) now := time.Now() switch qt[1] { case "h": et = now.Unix() st = et + util.Int64All(qt[0])*3600 case "d": st = util.GetDayStartSecond(util.IntAll(qt[0])) et = util.GetDayStartSecond(0) } st1 := fmt.Sprintf("%x0000000000000000", st) et1 := fmt.Sprintf("%x0000000000000000", et) eq := fmt.Sprintf(esQ, st1, et1) fmt.Println("eq", eq) eq1 := fmt.Sprintf(esQ1, st1, et1) fmt.Println("eq1", eq1) rangeQuery := es.NewRangeQuery("id").Gte(st1).Lt(et1) termsQuery := es.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网") countQuery := es.NewBoolQuery(). Must(rangeQuery) count1Query := es.NewBoolQuery().Must(rangeQuery).Filter(termsQuery) es2 := elastic.Elastic{S_esurl: esAddr2, I_size: 2, Username: username, Password: password} es2.InitElasticSize() es3 := elastic.Elastic{S_esurl: esAddr3, I_size: 5, Username: username3, Password: password3} es3.InitElasticSize() count := int(es2.Count(esIndex2, esIndex2, countQuery)) //公司 es集群 数量统计 count1 := int(es2.Count(esIndex, esIndex, count1Query)) //竞品网站 数量统计 countNew := int(es3.Count(esIndex3, esIndex3, countQuery)) //华为云 新集群 fmt.Println("count", count) fmt.Println("count1", count1) fmt.Println("countNew", countNew) switch t.Type { case "alert": if count < t.Min || count > t.Max || countNew < t.Min || countNew > t.Max { report := fmt.Sprintf("告警%s,最小%d,最大%d,统计结果:%d", t.Name, t.Min, t.Max, count) t.SendMail(report) } case "report": report := fmt.Sprintf("统计报告%s,【统计结果】,es库数量:%d", t.Name, count) if len(t.Mgo) > 5 { fs := strings.Split(t.Mgo, "|") fmgo := mongodb.NewMgoWithUser(fs[0], fs[3], fs[1], fs[2], 1) id1 := mongodb.StringTOBsonId(st1) id2 := mongodb.StringTOBsonId(et1) mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段 fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1} sess := fmgo.GetMgoConn() defer fmgo.DestoryMongoConn(sess) /** count 一天内,es 中 数据总量 count1 一天内,es 中 竞品总量 count2 一天内,mgo 总入库量 count3 一天内,mgo 有效数据 总数 count4 一天内,mgo 中 竞品数据总量 count5 一天内,mgo 有效数据中,竞品的数量 countNew 一天内,es3新集群 中 数据总量 timeCount pici-comeintime 时差在12小时内的数据 */ count2, count3 := 0, 0 // count4, count5 := 0, 0 //竟品 es_comeintime_totaltime := int64(0) //comeintime 和 生索引 pici 时间 差值的总和 es_publishtime_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和 es_comeintime_avgtime := int64(0) //comeintime 和 生索引 pici 时间 差值的平均值 es_publishtime_avgtime := int64(0) //publishtime 和 生索引 pici 时间 差值的平均值 file_totaltime := int64(0) no_file_totaltime := int64(0) file_avgltime := int64(0) no_file_avgltime := int64(0) timeCount := 0 //12小时 统计时间差的数据量 fileCount := 0 noFileCount := 0 //统计pici -comeintime 时间差,1、3、5、10、15、30、30+ 分钟 var pc_diff1 int64 var pc_diff3 int64 var pc_diff5 int64 var pc_diff10 int64 var pc_diff15 int64 var pc_diff30 int64 var pc_diff301 int64 query := sess.DB(fs[3]).C(fs[4]).Find(mq).Select(fd).Iter() for tmp := make(map[string]interface{}); query.Next(tmp); count2++ { if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" { count4++ } if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 { count3++ comeintime := util.Int64All(tmp["comeintime"]) publishtime := util.Int64All(tmp["publishtime"]) pici := util.Int64All(tmp["pici"]) tp := time.Unix(publishtime, 0) isZero := false if (tp.Hour() == 0 && tp.Minute() == 0) && tp.Second() == 0 { isZero = true } if (comeintime-publishtime) < 12*60*60 && !isZero { if pici > 0 { diff1 := pici - comeintime diff2 := pici - publishtime if diff1 < 0 { fmt.Println("diff1", diff1, tmp["_id"]) } else if diff1 <= 60 { pc_diff1++ } else if diff1 <= 3*60 { pc_diff3++ } else if diff1 <= 5*60 { pc_diff5++ } else if diff1 <= 10*60 { pc_diff10++ } else if diff1 <= 15*60 { pc_diff15++ } else if diff1 <= 30*60 { pc_diff30++ } else { pc_diff301++ } if diff2 < 0 { fmt.Println("diff2", diff2, tmp["_id"]) } es_comeintime_totaltime += diff1 es_publishtime_totaltime += diff2 timeCount++ if _, ok := tmp["attach_text"]; ok { curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix() diff3 := curtime - comeintime if diff3 >= 0 { file_totaltime += diff3 } fileCount++ } else { curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix() diff4 := curtime - comeintime if diff4 >= 0 { no_file_totaltime += diff4 } noFileCount++ } } } if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" { count5++ } } } if timeCount > 0 { es_comeintime_avgtime = es_comeintime_totaltime / int64(timeCount) es_publishtime_avgtime = es_publishtime_totaltime / int64(timeCount) } if fileCount > 0 { file_avgltime = file_totaltime / int64(fileCount) } if noFileCount > 0 { no_file_avgltime = no_file_totaltime / int64(noFileCount) } report += ",mgo统计:" + fmt.Sprint(count3) + ",差值:" + fmt.Sprint(count3-count) + ",mgo总入库量:" + fmt.Sprint(count2) report += "
" + "【竟品统计结果】:" + strconv.Itoa(count1) + ",mgo统计:" + fmt.Sprint(count5) + ",差值:" + fmt.Sprint(count5-count1) + ",mgo总入库量" + fmt.Sprint(count4) + "
【新集群统计结果】es数量:" + strconv.Itoa(countNew) + ",mgo统计:" + fmt.Sprint(count3) + ",差值:" + fmt.Sprint(count3-countNew) //存入数据库 yesterday := now.AddDate(0, 0, -1) insert := map[string]interface{}{ "es_count": count, // "es3_count": countNew, // "mgo_count": count3, "es_mgo_diff": count3 - count, "mgo_total": count2, "competitor_es_count": count1, //竞品网站es 数量 "competitor_mgo_count": count5, "competitor_diff": count5 - count1, "competitor_mgo_total": count4, "date": yesterday.Format("2006-01-02"), "es_comeintime_totaltime": es_comeintime_totaltime, "es_publishtime_totaltime": es_publishtime_totaltime, "es_comeintime_avgtime": es_comeintime_avgtime, "es_publishtime_avgtime": es_publishtime_avgtime, "file_avgltime": file_avgltime, "no_file_avgltime": no_file_avgltime, "file_totaltime": file_totaltime, "no_file_totaltime": no_file_totaltime, "file_count": fileCount, "no_file_count": noFileCount, "pc_diff1": pc_diff1, "pc_diff3": pc_diff3, "pc_diff5": pc_diff5, "pc_diff10": pc_diff10, "pc_diff15": pc_diff15, "pc_diff30": pc_diff30, "pc_diff301": pc_diff301, "timeCount;": timeCount, } fmgo.Save("bidding_ribao", insert) } t.SendMail(report) //t.SendMail("【竟品统计结果】") //fmt.Println(report) } log.Println("task over:", t.Name, eq, count) } func (t *T) SendMail(report string) { url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, t.Name, report) fmt.Println("url=>", url) res, err := http.Get(url) if err != nil { fmt.Println("SendMail err ", err) } else { fmt.Println("SendMail res ", res) } }