|
- package main
- import (
- "encoding/json"
- "fmt"
- es "github.com/olivere/elastic/v7"
- "github.com/robfig/cron/v3"
- "go.mongodb.org/mongo-driver/bson"
- "go.mongodb.org/mongo-driver/bson/primitive"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "net/http"
- "strings"
- "time"
- )
- //定时任务,去统计bidding索引,排查问题、预警
- type T struct {
- Cron string
- Name string
- Min int
- Max int
- Type string
- Tjscope string
- Mgo string
- }
- // es数据最报警
- var (
- config map[string]interface{}
- to string
- api string
- esAddr, esAddr2, esAddr3 string
- esIndex, esIndex2, esIndex3 string
- username, password string
- username3, password3 string
- Ts = []*T{}
- esQ = `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
- esQ1 = `{"query": {"bool": {"must": [{"range": {"id": {"gte": "%s","lt": "%s"}}},{"terms": {"bidding.site": ["元博网(采购与招标网)","中国招标与采购网"]}}]}}}`
- )
- func Init() {
- util.ReadConfig(&config)
- jkmail := config["jkmail"].(map[string]interface{})
- to, _ = jkmail["to"].(string)
- api, _ = jkmail["api"].(string)
- //esAddr, _ = config["esAddr"].(string)
- esIndex, _ = config["esIndex"].(string)
- esAddr2, _ = config["esAddr2"].(string)
- esIndex2, _ = config["esIndex2"].(string) //
- if _, ok := config["esAddr3"]; ok {
- esAddr3, _ = config["esAddr3"].(string)
- esIndex3, _ = config["esIndex3"].(string) //
- username3, _ = config["username3"].(string)
- password3, _ = config["password3"].(string)
- }
- username, _ = config["username"].(string)
- password, _ = config["password"].(string)
- tasks, _ := config["task"].([]interface{})
- for _, t := range tasks {
- bs, _ := json.Marshal(t)
- var v *T
- json.Unmarshal(bs, &v)
- if v != nil {
- Ts = append(Ts, v)
- }
- }
- }
- func main() {
- Init()
- log.Println("start..")
- if len(Ts) > 0 {
- local, _ := time.LoadLocation("Asia/Shanghai")
- c := cron.New(cron.WithLocation(local), cron.WithSeconds())
- for _, v := range Ts {
- c.AddFunc(v.Cron, v.task)
- }
- c.Start()
- defer c.Stop()
- select {}
- }
- log.Println("end..")
- }
- func (t *T) task() {
- //初始化语句
- qt := strings.Split(t.Tjscope, ",")
- if len(qt) != 2 {
- return
- }
- st, et := int64(0), int64(0)
- now := time.Now()
- switch qt[1] {
- case "h":
- et = now.Unix()
- st = et + util.Int64All(qt[0])*3600
- case "d":
- st = util.GetDayStartSecond(util.IntAll(qt[0]))
- et = util.GetDayStartSecond(0)
- }
- st1 := fmt.Sprintf("%x0000000000000000", st)
- et1 := fmt.Sprintf("%x0000000000000000", et)
- eq := fmt.Sprintf(esQ, st1, et1)
- fmt.Println("eq", eq)
- eq1 := fmt.Sprintf(esQ1, st1, et1)
- fmt.Println("eq1", eq1)
- rangeQuery := es.NewRangeQuery("id").Gte(st1).Lt(et1)
- //竞品网站统计
- termsQuery := es.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网", "北京隆道网络科技有限公司", "友云采")
- countQuery := es.NewBoolQuery().
- Must(rangeQuery)
- //竞品网站总量
- count1Query := es.NewBoolQuery().Must(rangeQuery).Filter(termsQuery)
- //元博网(采购与招标网),竞品网站数量统计
- countComptetQuery1 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "元博网(采购与招标网)"))
- countComptetQuery2 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "中国招标与采购网"))
- countComptetQuery3 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "北京隆道网络科技有限公司"))
- countComptetQuery4 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "友云采"))
- countComptetQuery5 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "标800"))
- es2 := elastic.Elastic{S_esurl: esAddr2, I_size: 2, Username: username, Password: password}
- es2.InitElasticSize()
- es3 := elastic.Elastic{S_esurl: esAddr3, I_size: 5, Username: username3, Password: password3}
- es3.InitElasticSize()
- count := int(es2.Count(esIndex2, countQuery)) //公司 es集群 数量统计
- count1 := int(es2.Count(esIndex, count1Query)) //竞品网站 数量统计
- //元博网(采购与招标网)
- countComptet1 := int(es2.Count(esIndex, countComptetQuery1))
- //中国招标与采购网
- countComptet2 := int(es2.Count(esIndex, countComptetQuery2))
- //北京隆道网络科技有限公司
- countComptet3 := int(es2.Count(esIndex, countComptetQuery3))
- //友云采
- countComptet4 := int(es2.Count(esIndex, countComptetQuery4))
- //标800
- countComptet5 := int(es2.Count(esIndex, countComptetQuery5))
- countNew := int(es3.Count(esIndex3, countQuery)) //华为云 新集群
- log.Println("count", count)
- log.Println("count1", count1)
- log.Println("countNew", countNew)
- report := ""
- switch t.Type {
- case "alert":
- if count < t.Min || count > t.Max || countNew < t.Min || countNew > t.Max {
- report = fmt.Sprintf("告警%s,最小%d,最大%d,统计结果:%d", t.Name, t.Min, t.Max, count)
- t.SendMail(report)
- }
- case "report":
- //report = fmt.Sprintf("统计报告%s,【统计结果】,es库数量:%d", t.Name, count)
- if len(t.Mgo) > 5 {
- fs := strings.Split(t.Mgo, "|")
- fmgo := mongodb.NewMgoWithUser(fs[0], fs[3], fs[1], fs[2], 1)
- id1 := mongodb.StringTOBsonId(st1)
- id2 := mongodb.StringTOBsonId(et1)
- mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
- fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
- sess := fmgo.GetMgoConn()
- defer fmgo.DestoryMongoConn(sess)
- /**
- count 一天内,es 中 数据总量
- count1 一天内,es 中 竞品总量
- count2 一天内,mgo 总入库量
- count3 一天内,mgo 有效数据 总数
- count4 一天内,mgo 中 竞品数据总量
- count5 一天内,mgo 有效数据中,竞品的数量
- countNew 一天内,es3新集群 中 数据总量
- timeCount pici-comeintime 时差在12小时内的数据
- */
- count2, count3 := 0, 0 //
- count4, count5 := 0, 0 //竟品
- es_comeintime_totaltime := int64(0) //comeintime 和 生索引 pici 时间 差值的总和
- es_publishtime_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和
- es_comeintime_avgtime := int64(0) //comeintime 和 生索引 pici 时间 差值的平均值
- es_publishtime_avgtime := int64(0) //publishtime 和 生索引 pici 时间 差值的平均值
- file_totaltime := int64(0)
- no_file_totaltime := int64(0)
- file_avgltime := int64(0)
- no_file_avgltime := int64(0)
- timeCount := 0 //12小时 统计时间差的数据量
- fileCount := 0
- noFileCount := 0
- //统计pici -comeintime 时间差,1、3、5、10、15、30、30+ 分钟
- var pc_diff1 int64
- var pc_diff3 int64
- var pc_diff5 int64
- var pc_diff10 int64
- var pc_diff15 int64
- var pc_diff30 int64
- var pc_diff301 int64
- var competCount1 int
- var competCount2 int
- var competCount3 int
- var competCount4 int
- var competCount5 int
- competeReal1 := 0
- competeReal2 := 0
- competeReal3 := 0
- competeReal4 := 0
- competeReal5 := 0
- query := sess.DB(fs[3]).C(fs[4]).Find(mq).Select(fd).Iter()
- for tmp := make(map[string]interface{}); query.Next(tmp); count2++ {
- if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" || util.ObjToString(tmp["site"]) == "标800" {
- count4++
- if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
- competCount1++
- }
- if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
- competCount2++
- }
- if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
- competCount3++
- }
- if util.ObjToString(tmp["site"]) == "友云采" {
- competCount4++
- }
- if util.ObjToString(tmp["site"]) == "标800" {
- competCount5++
- }
- }
- if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
- count3++
- comeintime := util.Int64All(tmp["comeintime"])
- publishtime := util.Int64All(tmp["publishtime"])
- pici := util.Int64All(tmp["pici"])
- tp := time.Unix(publishtime, 0)
- isZero := false
- if (tp.Hour() == 0 && tp.Minute() == 0) && tp.Second() == 0 {
- isZero = true
- }
- if (comeintime-publishtime) < 12*60*60 && !isZero {
- if pici > 0 {
- diff1 := pici - comeintime
- diff2 := pici - publishtime
- if diff1 < 0 {
- fmt.Println("diff1", diff1, tmp["_id"])
- } else if diff1 <= 60 {
- pc_diff1++
- } else if diff1 <= 3*60 {
- pc_diff3++
- } else if diff1 <= 5*60 {
- pc_diff5++
- } else if diff1 <= 10*60 {
- pc_diff10++
- } else if diff1 <= 15*60 {
- pc_diff15++
- } else if diff1 <= 30*60 {
- pc_diff30++
- } else {
- pc_diff301++
- }
- if diff2 < 0 {
- fmt.Println("diff2", diff2, tmp["_id"])
- }
- es_comeintime_totaltime += diff1
- es_publishtime_totaltime += diff2
- timeCount++
- if _, ok := tmp["attach_text"]; ok {
- curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
- diff3 := curtime - comeintime
- if diff3 >= 0 {
- file_totaltime += diff3
- }
- fileCount++
- } else {
- curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
- diff4 := curtime - comeintime
- if diff4 >= 0 {
- no_file_totaltime += diff4
- }
- noFileCount++
- }
- }
- }
- if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" || util.ObjToString(tmp["site"]) == "标800" {
- count5++
- if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
- competeReal1++
- }
- if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
- competeReal2++
- }
- if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
- competeReal3++
- }
- if util.ObjToString(tmp["site"]) == "友云采" {
- competeReal4++
- }
- if util.ObjToString(tmp["site"]) == "标800" {
- competeReal5++
- }
- }
- }
- }
- if timeCount > 0 {
- es_comeintime_avgtime = es_comeintime_totaltime / int64(timeCount)
- es_publishtime_avgtime = es_publishtime_totaltime / int64(timeCount)
- }
- if fileCount > 0 {
- file_avgltime = file_totaltime / int64(fileCount)
- }
- if noFileCount > 0 {
- no_file_avgltime = no_file_totaltime / int64(noFileCount)
- }
- //report += `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-count) + `,mgo总入库量:` + fmt.Sprint(count2) +
- // `<br>竟品统计结果_es统计:` + strconv.Itoa(count1) + `,mgo统计:` + fmt.Sprint(count5) + `,差值:` + fmt.Sprint(count5-count1) + `,mgo总入库量` + fmt.Sprint(count4) +
- // `<br>元博网_es统计:` + strconv.Itoa(countComptet1) + `,mgo统计:` + fmt.Sprint(competeReal1) + `,差值:` + fmt.Sprint(competeReal1-countComptet1) + `,mgo总入库量` + fmt.Sprint(competCount1) +
- // `<br>中国招标与采购网_es统计` + strconv.Itoa(countComptet2) + `,mgo统计:` + fmt.Sprint(competeReal2) + `,差值:` + fmt.Sprint(competeReal2-countComptet2) + `,mgo总入库量` + fmt.Sprint(competCount2) +
- // `<br>北京隆道网络科技有限公司_es统计` + strconv.Itoa(countComptet3) + `,mgo统计:` + fmt.Sprint(competeReal3) + `,差值:` + fmt.Sprint(competeReal3-countComptet3) + `,mgo总入库量` + fmt.Sprint(competCount3) +
- // `<br>友云采_es统计` + strconv.Itoa(countComptet4) + `,mgo统计:` + fmt.Sprint(competeReal4) + `,差值:` + fmt.Sprint(competeReal4-countComptet4) + `,mgo总入库量` + fmt.Sprint(competCount4) +
- // `<br>新集群统计结果_es数量:` + strconv.Itoa(countNew) + `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-countNew)
- //
- var reportBuilder strings.Builder
- // 开始表格
- reportBuilder.WriteString("<table>")
- reportBuilder.WriteString("<tr><th>统计项</th><th>es统计</th><th>mgo统计</th><th>差值</th><th>mgo总入库量</th></tr>")
- addTableRow(&reportBuilder, "阿里云es集群", count, count3, count3-count, count2)
- addTableRow(&reportBuilder, "竟品统计结果", count1, count5, count5-count1, count4)
- addTableRow(&reportBuilder, "元博网", countComptet1, competeReal1, competeReal1-countComptet1, competCount1)
- addTableRow(&reportBuilder, "中国招标与采购网", countComptet2, competeReal2, competeReal2-countComptet2, competCount2)
- addTableRow(&reportBuilder, "北京隆道网络科技有限公司", countComptet3, competeReal3, competeReal3-countComptet3, competCount3)
- addTableRow(&reportBuilder, "友云采", countComptet4, competeReal4, competeReal4-countComptet4, competCount4)
- addTableRow(&reportBuilder, "标800", countComptet5, competeReal5, competeReal5-countComptet5, competCount5)
- addTableRow(&reportBuilder, "华为云es集群", countNew, count3, count3-countNew, count2)
- // 结束表格
- reportBuilder.WriteString("</table>")
- // 获取最终的报告字符串
- report = reportBuilder.String()
- //存入数据库
- yesterday := now.AddDate(0, 0, -1)
- insert := map[string]interface{}{
- "es_count": count, //
- "es3_count": countNew, //
- "mgo_count": count3,
- "es_mgo_diff": count3 - count,
- "mgo_total": count2,
- "competitor_es_count": count1, //竞品网站es 数量
- "competitor_mgo_count": count5,
- "competitor_diff": count5 - count1,
- "competitor_mgo_total": count4,
- "date": yesterday.Format("2006-01-02"),
- "es_comeintime_totaltime": es_comeintime_totaltime,
- "es_publishtime_totaltime": es_publishtime_totaltime,
- "es_comeintime_avgtime": es_comeintime_avgtime,
- "es_publishtime_avgtime": es_publishtime_avgtime,
- "file_avgltime": file_avgltime,
- "no_file_avgltime": no_file_avgltime,
- "file_totaltime": file_totaltime,
- "no_file_totaltime": no_file_totaltime,
- "file_count": fileCount,
- "no_file_count": noFileCount,
- "pc_diff1": pc_diff1,
- "pc_diff3": pc_diff3,
- "pc_diff5": pc_diff5,
- "pc_diff10": pc_diff10,
- "pc_diff15": pc_diff15,
- "pc_diff30": pc_diff30,
- "pc_diff301": pc_diff301,
- "timeCount": timeCount,
- }
- fmgo.Save("bidding_ribao", insert)
- }
- t.SendMail(report)
- //t.SendMail("【竟品统计结果】")
- //fmt.Println(report)
- }
- log.Println("task over:", t.Name, eq, count)
- }
- func (t *T) SendMail(report string) {
- url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, t.Name, report)
- fmt.Println("url=>", url)
- res, err := http.Get(url)
- if err != nil {
- fmt.Println("SendMail err ", err)
- } else {
- fmt.Println("SendMail res ", res)
- }
- }
- func addTableRow(builder *strings.Builder, itemName string, esCount, mgoCount, diff, mgoTotal int) {
- builder.WriteString(fmt.Sprintf(`<tr><th>%s</th><th>%d</th><th>%d</th><th>%d</th><th>%d</th></tr>`, itemName, esCount, mgoCount, diff, mgoTotal))
- }
|