package main
import (
"encoding/json"
"fmt"
es "github.com/olivere/elastic/v7"
"github.com/robfig/cron/v3"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
"jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
"log"
"net/http"
"strings"
"time"
)
//定时任务,去统计bidding索引,排查问题、预警
type T struct {
Cron string
Name string
Min int
Max int
Type string
Tjscope string
Mgo string
}
// es数据最报警
var (
config map[string]interface{}
to string
api string
esAddr, esAddr2, esAddr3 string
esIndex, esIndex2, esIndex3 string
username, password string
username3, password3 string
Ts = []*T{}
esQ = `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
esQ1 = `{"query": {"bool": {"must": [{"range": {"id": {"gte": "%s","lt": "%s"}}},{"terms": {"bidding.site": ["元博网(采购与招标网)","中国招标与采购网"]}}]}}}`
)
func Init() {
util.ReadConfig(&config)
jkmail := config["jkmail"].(map[string]interface{})
to, _ = jkmail["to"].(string)
api, _ = jkmail["api"].(string)
//esAddr, _ = config["esAddr"].(string)
esIndex, _ = config["esIndex"].(string)
esAddr2, _ = config["esAddr2"].(string)
esIndex2, _ = config["esIndex2"].(string) //
if _, ok := config["esAddr3"]; ok {
esAddr3, _ = config["esAddr3"].(string)
esIndex3, _ = config["esIndex3"].(string) //
username3, _ = config["username3"].(string)
password3, _ = config["password3"].(string)
}
username, _ = config["username"].(string)
password, _ = config["password"].(string)
tasks, _ := config["task"].([]interface{})
for _, t := range tasks {
bs, _ := json.Marshal(t)
var v *T
json.Unmarshal(bs, &v)
if v != nil {
Ts = append(Ts, v)
}
}
}
func main() {
Init()
log.Println("start..")
if len(Ts) > 0 {
local, _ := time.LoadLocation("Asia/Shanghai")
c := cron.New(cron.WithLocation(local), cron.WithSeconds())
for _, v := range Ts {
c.AddFunc(v.Cron, v.task)
}
c.Start()
defer c.Stop()
select {}
}
log.Println("end..")
}
func (t *T) task() {
//初始化语句
qt := strings.Split(t.Tjscope, ",")
if len(qt) != 2 {
return
}
st, et := int64(0), int64(0)
now := time.Now()
switch qt[1] {
case "h":
et = now.Unix()
st = et + util.Int64All(qt[0])*3600
case "d":
st = util.GetDayStartSecond(util.IntAll(qt[0]))
et = util.GetDayStartSecond(0)
}
st1 := fmt.Sprintf("%x0000000000000000", st)
et1 := fmt.Sprintf("%x0000000000000000", et)
eq := fmt.Sprintf(esQ, st1, et1)
fmt.Println("eq", eq)
eq1 := fmt.Sprintf(esQ1, st1, et1)
fmt.Println("eq1", eq1)
rangeQuery := es.NewRangeQuery("id").Gte(st1).Lt(et1)
//竞品网站统计
termsQuery := es.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网", "北京隆道网络科技有限公司", "友云采", "标800")
countQuery := es.NewBoolQuery().
Must(rangeQuery)
//竞品网站总量
count1Query := es.NewBoolQuery().Must(rangeQuery).Filter(termsQuery)
//元博网(采购与招标网),竞品网站数量统计
countComptetQuery1 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "元博网(采购与招标网)"))
countComptetQuery2 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "中国招标与采购网"))
countComptetQuery3 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "北京隆道网络科技有限公司"))
countComptetQuery4 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "友云采"))
countComptetQuery5 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "标800"))
es2 := elastic.Elastic{S_esurl: esAddr2, I_size: 2, Username: username, Password: password}
es2.InitElasticSize()
es3 := elastic.Elastic{S_esurl: esAddr3, I_size: 5, Username: username3, Password: password3}
es3.InitElasticSize()
count := int(es2.Count(esIndex2, countQuery)) //公司 es集群 数量统计
count1 := int(es2.Count(esIndex, count1Query)) //竞品网站 数量统计
//元博网(采购与招标网)
countComptet1 := int(es2.Count(esIndex, countComptetQuery1))
//中国招标与采购网
countComptet2 := int(es2.Count(esIndex, countComptetQuery2))
//北京隆道网络科技有限公司
countComptet3 := int(es2.Count(esIndex, countComptetQuery3))
//友云采
countComptet4 := int(es2.Count(esIndex, countComptetQuery4))
//标800
countComptet5 := int(es2.Count(esIndex, countComptetQuery5))
countNew := int(es3.Count(esIndex3, countQuery)) //华为云 新集群
log.Println("count", count)
log.Println("count1", count1)
log.Println("countNew", countNew)
report := ""
switch t.Type {
case "alert":
if count < t.Min || count > t.Max || countNew < t.Min || countNew > t.Max {
report = fmt.Sprintf("告警%s,最小%d,最大%d,统计结果:%d", t.Name, t.Min, t.Max, count)
t.SendMail(report)
}
case "report":
//report = fmt.Sprintf("统计报告%s,【统计结果】,es库数量:%d", t.Name, count)
if len(t.Mgo) > 5 {
fs := strings.Split(t.Mgo, "|")
fmgo := mongodb.NewMgoWithUser(fs[0], fs[3], fs[1], fs[2], 1)
id1 := mongodb.StringTOBsonId(st1)
id2 := mongodb.StringTOBsonId(et1)
mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
sess := fmgo.GetMgoConn()
defer fmgo.DestoryMongoConn(sess)
/**
count 一天内,es 中 数据总量
count1 一天内,es 中 竞品总量
count2 一天内,mgo 总入库量
count3 一天内,mgo 有效数据 总数
count4 一天内,mgo 中 竞品数据总量
count5 一天内,mgo 有效数据中,竞品的数量
countNew 一天内,es3新集群 中 数据总量
timeCount pici-comeintime 时差在12小时内的数据
*/
count2, count3 := 0, 0 //
count4, count5 := 0, 0 //竟品
es_comeintime_totaltime := int64(0) //comeintime 和 生索引 pici 时间 差值的总和
es_publishtime_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和
es_comeintime_avgtime := int64(0) //comeintime 和 生索引 pici 时间 差值的平均值
es_publishtime_avgtime := int64(0) //publishtime 和 生索引 pici 时间 差值的平均值
file_totaltime := int64(0)
no_file_totaltime := int64(0)
file_avgltime := int64(0)
no_file_avgltime := int64(0)
timeCount := 0 //12小时 统计时间差的数据量
fileCount := 0
noFileCount := 0
//统计pici -comeintime 时间差,1、3、5、10、15、30、30+ 分钟
var pc_diff1 int64
var pc_diff3 int64
var pc_diff5 int64
var pc_diff10 int64
var pc_diff15 int64
var pc_diff30 int64
var pc_diff301 int64
var competCount1 int
var competCount2 int
var competCount3 int
var competCount4 int
var competCount5 int
competeReal1 := 0
competeReal2 := 0
competeReal3 := 0
competeReal4 := 0
competeReal5 := 0
query := sess.DB(fs[3]).C(fs[4]).Find(mq).Select(fd).Iter()
for tmp := make(map[string]interface{}); query.Next(tmp); count2++ {
if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" || util.ObjToString(tmp["site"]) == "标800" {
count4++
if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
competCount1++
}
if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
competCount2++
}
if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
competCount3++
}
if util.ObjToString(tmp["site"]) == "友云采" {
competCount4++
}
if util.ObjToString(tmp["site"]) == "标800" {
competCount5++
}
}
if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
count3++
comeintime := util.Int64All(tmp["comeintime"])
publishtime := util.Int64All(tmp["publishtime"])
pici := util.Int64All(tmp["pici"])
tp := time.Unix(publishtime, 0)
isZero := false
if (tp.Hour() == 0 && tp.Minute() == 0) && tp.Second() == 0 {
isZero = true
}
if (comeintime-publishtime) < 12*60*60 && !isZero {
if pici > 0 {
diff1 := pici - comeintime
diff2 := pici - publishtime
if diff1 < 0 {
fmt.Println("diff1", diff1, tmp["_id"])
} else if diff1 <= 60 {
pc_diff1++
} else if diff1 <= 3*60 {
pc_diff3++
} else if diff1 <= 5*60 {
pc_diff5++
} else if diff1 <= 10*60 {
pc_diff10++
} else if diff1 <= 15*60 {
pc_diff15++
} else if diff1 <= 30*60 {
pc_diff30++
} else {
pc_diff301++
}
if diff2 < 0 {
fmt.Println("diff2", diff2, tmp["_id"])
}
es_comeintime_totaltime += diff1
es_publishtime_totaltime += diff2
timeCount++
if _, ok := tmp["attach_text"]; ok {
curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
diff3 := curtime - comeintime
if diff3 >= 0 {
file_totaltime += diff3
}
fileCount++
} else {
curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
diff4 := curtime - comeintime
if diff4 >= 0 {
no_file_totaltime += diff4
}
noFileCount++
}
}
}
if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" || util.ObjToString(tmp["site"]) == "标800" {
count5++
if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
competeReal1++
}
if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
competeReal2++
}
if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
competeReal3++
}
if util.ObjToString(tmp["site"]) == "友云采" {
competeReal4++
}
if util.ObjToString(tmp["site"]) == "标800" {
competeReal5++
}
}
}
}
if timeCount > 0 {
es_comeintime_avgtime = es_comeintime_totaltime / int64(timeCount)
es_publishtime_avgtime = es_publishtime_totaltime / int64(timeCount)
}
if fileCount > 0 {
file_avgltime = file_totaltime / int64(fileCount)
}
if noFileCount > 0 {
no_file_avgltime = no_file_totaltime / int64(noFileCount)
}
//report += `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-count) + `,mgo总入库量:` + fmt.Sprint(count2) +
// `
竟品统计结果_es统计:` + strconv.Itoa(count1) + `,mgo统计:` + fmt.Sprint(count5) + `,差值:` + fmt.Sprint(count5-count1) + `,mgo总入库量` + fmt.Sprint(count4) +
// `
元博网_es统计:` + strconv.Itoa(countComptet1) + `,mgo统计:` + fmt.Sprint(competeReal1) + `,差值:` + fmt.Sprint(competeReal1-countComptet1) + `,mgo总入库量` + fmt.Sprint(competCount1) +
// `
中国招标与采购网_es统计` + strconv.Itoa(countComptet2) + `,mgo统计:` + fmt.Sprint(competeReal2) + `,差值:` + fmt.Sprint(competeReal2-countComptet2) + `,mgo总入库量` + fmt.Sprint(competCount2) +
// `
北京隆道网络科技有限公司_es统计` + strconv.Itoa(countComptet3) + `,mgo统计:` + fmt.Sprint(competeReal3) + `,差值:` + fmt.Sprint(competeReal3-countComptet3) + `,mgo总入库量` + fmt.Sprint(competCount3) +
// `
友云采_es统计` + strconv.Itoa(countComptet4) + `,mgo统计:` + fmt.Sprint(competeReal4) + `,差值:` + fmt.Sprint(competeReal4-countComptet4) + `,mgo总入库量` + fmt.Sprint(competCount4) +
// `
新集群统计结果_es数量:` + strconv.Itoa(countNew) + `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-countNew)
//
var reportBuilder strings.Builder
// 开始表格
reportBuilder.WriteString("
统计项 | es统计 | mgo统计 | 差值 | mgo总入库量 |
---|