package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/cron"
"go.mongodb.org/mongo-driver/bson"
"io/ioutil"
"mongodb"
"net/http"
"qfw/util"
"strconv"
"time"
)
var taskA = 0 // 增量统计跳过
var (
WebUrl = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=45962efc-ca87-4996-9ffa-08bf6608ab7a"
WarningStr1 = "机器智能识别提醒,%d分钟未有数据入库,bidding表数据已积累%d条。"
WarningStr2 = "数据采集提醒,%d分钟未有数据入库。"
skip, send = 0, 0
)
func TimeTask() {
//StartTask()
c := cron.New()
//cronstr := "0 0 * * * ?" // 每小时执行一次
cronstr := "0 */10 * * * ?" // 每10min执行一次
//_ = c.AddFunc(cronstr, func() { StartTask() })
_ = c.AddFunc(cronstr, func() {
util.Debug("Start Task...")
Attachment()
})
c.Start()
}
func StartTask() {
util.Debug("Start Task...")
result := MgoCount()
result = EsCheck(result)
if result != "" {
util.Debug(result)
}
util.Debug("Task Over...")
}
// @Description 附件识别监控
// @Author J 2022/6/9 5:58 PM
func Attachment() {
info, _ := Mgo.Find("ocr_goods_over", nil, map[string]interface{}{"_id": -1}, nil, true, -1, -1)
lastT := util.Int64All((*info)[0]["import_time"])
if time.Now().Unix()-lastT > 30*60 {
if skip == 0 {
skip += 1
util.Debug("跳过第一次", (*info)[0]["_id"])
} else {
send++
if send <= 3 {
SendMsg("当前最后一个id段数据累积时间已经超过30分钟, 最后一段数据入库时间为:" + util.FormatDateByInt64(&lastT, util.Date_Full_Layout))
}
}
} else {
if skip != 0 || send != 0 {
skip = 0
send = 0
}
}
}
// 统计mgo数据
func MgoCount() string {
defer util.Catch()
sess := Mgo.GetMgoConn()
defer Mgo.DestoryMongoConn(sess)
result := ""
// biddingback
count := Mgo.Count("bidding_back", nil)
util.Debug("bidding_back---", BiddingBackSize, count)
if int64(count) != BiddingBackSize {
result = fmt.Sprintf("bindding_back数据异常,统计结果%d,往常数据量%d", count, BiddingBackSize)
result += "
"
}
// bidding
st := time.Now().Unix() - 3600
currentLastId := fmt.Sprintf("%x0000000000000000", st)
// 对比内存中存量id的统计结果
q1 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}}
count1 := Mgo.Count("bidding", q1)
util.Debug("bidding---", LastStockSize, count1, q1)
if int64(count1) != LastStockSize {
result = fmt.Sprintf("bindding数据存量统计异常,统计条件%v,统计结果%d,上次统计数据量%d", q1, count1, LastStockSize)
result += "
"
} else {
// 无异常,替换成当前时间点(当前时间点往前一个小时的id)存量的统计结果
LastStockId = currentLastId
q2 := bson.M{"_id": bson.M{"$lte": mongodb.StringTOBsonId(LastStockId)}}
count2 := Mgo.Count("bidding", q2)
LastStockSize = int64(count2)
}
// 增量
et := time.Now().Unix()
currentId := fmt.Sprintf("%x0000000000000000", et)
q3 := bson.M{"_id": bson.M{"$gt": mongodb.StringTOBsonId(currentLastId), "$lte": mongodb.StringTOBsonId(currentId)}}
count3 := Mgo.Count("bidding", q3)
util.Debug("bidding---", count3, q3)
if int64(count3) == 0 {
if taskA > 1 {
result += fmt.Sprintf("bindding数据增量统计异常,统计条件%v,统计结果%d", q3, count3)
result += "
"
taskA = 0
} else {
taskA++
}
}
return result
}
func EsCheck(result string) string {
client := Es.GetEsConn()
defer Es.DestoryEsConn(client)
resp, _ := client.ClusterHealth().Do()
util.Debug(*resp)
if resp.Status != "green" {
result += "
" + "检索库异常,异常内容:" + "
" +
"cluster_name:" + resp.ClusterName + "
" +
"status:" + resp.Status + "
" +
"number_of_nodes:" + strconv.Itoa(resp.NumberOfNodes) + "
" +
"number_of_data_nodes:" + strconv.Itoa(resp.NumberOfDataNodes) + "
" +
"number_of_data_nodes:" + strconv.Itoa(resp.ActivePrimaryShards) + "
" +
"active_shards:" + strconv.Itoa(resp.ActiveShards) + "
" +
"relocating_shards:" + strconv.Itoa(resp.RelocatingShards) + "
" +
"initialized_shards:" + strconv.Itoa(resp.InitializedShards) + "
" +
"unassigned_shards:" + strconv.Itoa(resp.UnassignedShards) + "
"
}
return result
}
func SendMail(report string) {
_, _ = http.Get(fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, "异常报告!", report))
}
func SendMsg(content string) {
client := &http.Client{}
data := map[string]interface{}{"msgtype": "text", "text": map[string]interface{}{
"content": content, "mentioned_list": []string{"@all"},
}}
bytesData, _ := json.Marshal(data)
req, _ := http.NewRequest("POST", WebUrl, bytes.NewReader(bytesData))
resp, _ := client.Do(req)
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println(string(body))
}