package main import ( "bytes" "encoding/json" "fmt" "github.com/cron" "go.mongodb.org/mongo-driver/bson" "io/ioutil" "mongodb" "net/http" "qfw/util" "strconv" "time" ) var taskA = 0 // 增量统计跳过 var ( WebUrl = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a" WarningStr1 = "机器智能识别提醒,%d分钟未有数据入库,bidding表数据已积累%d条。" WarningStr2 = "数据采集提醒,%d分钟未有数据入库。" skip, send = 0, 0 ) func TimeTask() { //StartTask() c := cron.New() //cronstr := "0 0 * * * ?" // 每小时执行一次 cronstr := "0 */10 * * * ?" // 每10min执行一次 //_ = c.AddFunc(cronstr, func() { StartTask() }) _ = c.AddFunc(cronstr, func() { util.Debug("Start Task...") Attachment() }) c.Start() } func StartTask() { util.Debug("Start Task...") result := MgoCount() result = EsCheck(result) if result != "" { util.Debug(result) } util.Debug("Task Over...") } // @Description 附件识别监控 // @Author J 2022/6/9 5:58 PM func Attachment() { info, _ := Mgo.Find("ocr_goods_over", nil, map[string]interface{}{"_id": -1}, nil, true, -1, -1) lastT := util.Int64All((*info)[0]["import_time"]) if time.Now().Unix()-lastT > 30*60 { if skip == 0 { skip += 1 util.Debug("跳过第一次", (*info)[0]["_id"]) } else { send++ if send <= 3 { SendMsg("当前最后一个id段数据累积时间已经超过30分钟, 最后一段数据入库时间为:" + util.FormatDateByInt64(&lastT, util.Date_Full_Layout)) } } } else { if skip != 0 || send != 0 { skip = 0 send = 0 } } } // 统计mgo数据 func MgoCount() string { defer util.Catch() sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) result := "" // biddingback count := Mgo.Count("bidding_back", nil) util.Debug("bidding_back---", BiddingBackSize, count) if int64(count) != BiddingBackSize { result = fmt.Sprintf("bindding_back数据异常,统计结果%d,往常数据量%d", count, BiddingBackSize) result += "
" } // bidding st := time.Now().Unix() - 3600 currentLastId := fmt.Sprintf("%x0000000000000000", st) // 对比内存中存量id的统计结果 q1 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}} count1 := Mgo.Count("bidding", q1) util.Debug("bidding---", LastStockSize, count1, q1) if int64(count1) != LastStockSize { result = fmt.Sprintf("bindding数据存量统计异常,统计条件%v,统计结果%d,上次统计数据量%d", q1, count1, LastStockSize) result += "
" } else { // 无异常,替换成当前时间点(当前时间点往前一个小时的id)存量的统计结果 LastStockId = currentLastId q2 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}} count2 := Mgo.Count("bidding", q2) LastStockSize = int64(count2) } // 增量 et := time.Now().Unix() currentId := fmt.Sprintf("%x0000000000000000", et) q3 := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(currentLastId), "$lte": mongodb.StringTOBsonId(currentId)}} count3 := Mgo.Count("bidding", q3) util.Debug("bidding---", count3, q3) if int64(count3) == 0 { if taskA > 1 { result += fmt.Sprintf("bindding数据增量统计异常,统计条件%v,统计结果%d", q3, count3) result += "

" taskA = 0 } else { taskA++ } } return result } func EsCheck(result string) string { client := Es.GetEsConn() defer Es.DestoryEsConn(client) resp, _ := client.ClusterHealth().Do() util.Debug(*resp) if resp.Status != "green" { result += "
" + "检索库异常,异常内容:" + "
" + "cluster_name:" + resp.ClusterName + "
" + "status:" + resp.Status + "
" + "number_of_nodes:" + strconv.Itoa(resp.NumberOfNodes) + "
" + "number_of_data_nodes:" + strconv.Itoa(resp.NumberOfDataNodes) + "
" + "number_of_data_nodes:" + strconv.Itoa(resp.ActivePrimaryShards) + "
" + "active_shards:" + strconv.Itoa(resp.ActiveShards) + "
" + "relocating_shards:" + strconv.Itoa(resp.RelocatingShards) + "
" + "initialized_shards:" + strconv.Itoa(resp.InitializedShards) + "
" + "unassigned_shards:" + strconv.Itoa(resp.UnassignedShards) + "
" } return result } func SendMail(report string) { _, _ = http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, "异常报告!", report)) } func SendMsg(content string) { client := &http.Client{} data := map[string]interface{}{"msgtype": "text", "text": map[string]interface{}{ "content": content, "mentioned_list": []string{"@all"}, }} bytesData, _ := json.Marshal(data) req, _ := http.NewRequest("POST", WebUrl, bytes.NewReader(bytesData)) resp, _ := client.Do(req) body, _ := ioutil.ReadAll(resp.Body) fmt.Println(string(body)) }