main.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. es "github.com/olivere/elastic/v7"
  6. "github.com/robfig/cron/v3"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.mongodb.org/mongo-driver/bson/primitive"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "net/http"
  14. "strings"
  15. "time"
  16. )
  17. //定时任务,去统计bidding索引,排查问题、预警
  18. type T struct {
  19. Cron string
  20. Name string
  21. Min int
  22. Max int
  23. Type string
  24. Tjscope string
  25. Mgo string
  26. }
  27. // es数据最报警
  28. var (
  29. config map[string]interface{}
  30. to string
  31. api string
  32. esAddr, esAddr2, esAddr3 string
  33. esIndex, esIndex2, esIndex3 string
  34. username, password string
  35. username3, password3 string
  36. Ts = []*T{}
  37. esQ = `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
  38. esQ1 = `{"query": {"bool": {"must": [{"range": {"id": {"gte": "%s","lt": "%s"}}},{"terms": {"bidding.site": ["元博网(采购与招标网)","中国招标与采购网"]}}]}}}`
  39. )
  40. func Init() {
  41. util.ReadConfig(&config)
  42. jkmail := config["jkmail"].(map[string]interface{})
  43. to, _ = jkmail["to"].(string)
  44. api, _ = jkmail["api"].(string)
  45. //esAddr, _ = config["esAddr"].(string)
  46. esIndex, _ = config["esIndex"].(string)
  47. esAddr2, _ = config["esAddr2"].(string)
  48. esIndex2, _ = config["esIndex2"].(string) //
  49. if _, ok := config["esAddr3"]; ok {
  50. esAddr3, _ = config["esAddr3"].(string)
  51. esIndex3, _ = config["esIndex3"].(string) //
  52. username3, _ = config["username3"].(string)
  53. password3, _ = config["password3"].(string)
  54. }
  55. username, _ = config["username"].(string)
  56. password, _ = config["password"].(string)
  57. tasks, _ := config["task"].([]interface{})
  58. for _, t := range tasks {
  59. bs, _ := json.Marshal(t)
  60. var v *T
  61. json.Unmarshal(bs, &v)
  62. if v != nil {
  63. Ts = append(Ts, v)
  64. }
  65. }
  66. }
  67. func main() {
  68. Init()
  69. log.Println("start..")
  70. if len(Ts) > 0 {
  71. local, _ := time.LoadLocation("Asia/Shanghai")
  72. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  73. for _, v := range Ts {
  74. c.AddFunc(v.Cron, v.task)
  75. }
  76. c.Start()
  77. defer c.Stop()
  78. select {}
  79. }
  80. log.Println("end..")
  81. }
  82. func (t *T) task() {
  83. //初始化语句
  84. qt := strings.Split(t.Tjscope, ",")
  85. if len(qt) != 2 {
  86. return
  87. }
  88. st, et := int64(0), int64(0)
  89. now := time.Now()
  90. switch qt[1] {
  91. case "h":
  92. et = now.Unix()
  93. st = et + util.Int64All(qt[0])*3600
  94. case "d":
  95. st = util.GetDayStartSecond(util.IntAll(qt[0]))
  96. et = util.GetDayStartSecond(0)
  97. }
  98. st1 := fmt.Sprintf("%x0000000000000000", st)
  99. et1 := fmt.Sprintf("%x0000000000000000", et)
  100. eq := fmt.Sprintf(esQ, st1, et1)
  101. fmt.Println("eq", eq)
  102. eq1 := fmt.Sprintf(esQ1, st1, et1)
  103. fmt.Println("eq1", eq1)
  104. // 将时间戳转换为 time.Time 类型
  105. stt := time.Unix(st, 0)
  106. // 格式化时间为 "YYYY-MM-DD HH:MM:SS" 形式
  107. formattedTime := stt.Format("2006-01-02")
  108. rangeQuery := es.NewRangeQuery("id").Gte(st1).Lt(et1)
  109. //竞品网站统计
  110. termsQuery := es.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网", "北京隆道网络科技有限公司", "友云采", "标800")
  111. countQuery := es.NewBoolQuery().
  112. Must(rangeQuery)
  113. //竞品网站总量
  114. count1Query := es.NewBoolQuery().Must(rangeQuery).Filter(termsQuery)
  115. //元博网(采购与招标网),竞品网站数量统计
  116. countComptetQuery1 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "元博网(采购与招标网)"))
  117. countComptetQuery2 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "中国招标与采购网"))
  118. countComptetQuery3 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "北京隆道网络科技有限公司"))
  119. countComptetQuery4 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "友云采"))
  120. countComptetQuery5 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "标800"))
  121. es2 := elastic.Elastic{S_esurl: esAddr2, I_size: 2, Username: username, Password: password}
  122. es2.InitElasticSize()
  123. es3 := elastic.Elastic{S_esurl: esAddr3, I_size: 5, Username: username3, Password: password3}
  124. es3.InitElasticSize()
  125. count := int(es2.Count(esIndex2, countQuery)) //公司 es集群 数量统计
  126. count1 := int(es2.Count(esIndex, count1Query)) //竞品网站 数量统计
  127. //元博网(采购与招标网)
  128. countComptet1 := int(es2.Count(esIndex, countComptetQuery1))
  129. //中国招标与采购网
  130. countComptet2 := int(es2.Count(esIndex, countComptetQuery2))
  131. //北京隆道网络科技有限公司
  132. countComptet3 := int(es2.Count(esIndex, countComptetQuery3))
  133. //友云采
  134. countComptet4 := int(es2.Count(esIndex, countComptetQuery4))
  135. //标800
  136. countComptet5 := int(es2.Count(esIndex, countComptetQuery5))
  137. countNew := int(es3.Count(esIndex3, countQuery)) //华为云 新集群
  138. log.Println("count", count)
  139. log.Println("count1", count1)
  140. log.Println("countNew", countNew)
  141. report := ""
  142. switch t.Type {
  143. case "alert":
  144. if count < t.Min || count > t.Max || countNew < t.Min || countNew > t.Max {
  145. report = fmt.Sprintf("告警%s,最小%d,最大%d,统计结果:%d", t.Name, t.Min, t.Max, count)
  146. t.SendMail(report)
  147. }
  148. case "report":
  149. //report = fmt.Sprintf("统计报告%s,【统计结果】,es库数量:%d", t.Name, count)
  150. if len(t.Mgo) > 5 {
  151. fs := strings.Split(t.Mgo, "|")
  152. fmgo := mongodb.NewMgoWithUser(fs[0], fs[3], fs[1], fs[2], 1)
  153. id1 := mongodb.StringTOBsonId(st1)
  154. id2 := mongodb.StringTOBsonId(et1)
  155. mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
  156. fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
  157. sess := fmgo.GetMgoConn()
  158. defer fmgo.DestoryMongoConn(sess)
  159. /**
  160. count 一天内,es 中 数据总量
  161. count1 一天内,es 中 竞品总量
  162. count2 一天内,mgo 总入库量
  163. count3 一天内,mgo 有效数据 总数
  164. count4 一天内,mgo 中 竞品数据总量
  165. count5 一天内,mgo 有效数据中,竞品的数量
  166. countNew 一天内,es3新集群 中 数据总量
  167. timeCount pici-comeintime 时差在12小时内的数据
  168. */
  169. count2, count3 := 0, 0 //
  170. count4, count5 := 0, 0 //竟品
  171. es_comeintime_totaltime := int64(0) //comeintime 和 生索引 pici 时间 差值的总和
  172. es_publishtime_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和
  173. es_comeintime_avgtime := int64(0) //comeintime 和 生索引 pici 时间 差值的平均值
  174. es_publishtime_avgtime := int64(0) //publishtime 和 生索引 pici 时间 差值的平均值
  175. file_totaltime := int64(0)
  176. no_file_totaltime := int64(0)
  177. file_avgltime := int64(0)
  178. no_file_avgltime := int64(0)
  179. timeCount := 0 //12小时 统计时间差的数据量
  180. fileCount := 0
  181. noFileCount := 0
  182. //统计pici -comeintime 时间差,1、3、5、10、15、30、30+ 分钟
  183. var pc_diff1 int64
  184. var pc_diff3 int64
  185. var pc_diff5 int64
  186. var pc_diff10 int64
  187. var pc_diff15 int64
  188. var pc_diff30 int64
  189. var pc_diff301 int64
  190. var competCount1 int
  191. var competCount2 int
  192. var competCount3 int
  193. var competCount4 int
  194. var competCount5 int
  195. competeReal1 := 0
  196. competeReal2 := 0
  197. competeReal3 := 0
  198. competeReal4 := 0
  199. competeReal5 := 0
  200. query := sess.DB(fs[3]).C(fs[4]).Find(mq).Select(fd).Iter()
  201. for tmp := make(map[string]interface{}); query.Next(tmp); count2++ {
  202. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" || util.ObjToString(tmp["site"]) == "标800" {
  203. count4++
  204. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
  205. competCount1++
  206. }
  207. if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
  208. competCount2++
  209. }
  210. if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
  211. competCount3++
  212. }
  213. if util.ObjToString(tmp["site"]) == "友云采" {
  214. competCount4++
  215. }
  216. if util.ObjToString(tmp["site"]) == "标800" {
  217. competCount5++
  218. }
  219. }
  220. if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
  221. count3++
  222. comeintime := util.Int64All(tmp["comeintime"])
  223. publishtime := util.Int64All(tmp["publishtime"])
  224. pici := util.Int64All(tmp["pici"])
  225. tp := time.Unix(publishtime, 0)
  226. isZero := false
  227. if (tp.Hour() == 0 && tp.Minute() == 0) && tp.Second() == 0 {
  228. isZero = true
  229. }
  230. if (comeintime-publishtime) < 12*60*60 && !isZero {
  231. if pici > 0 {
  232. diff1 := pici - comeintime
  233. diff2 := pici - publishtime
  234. if diff1 < 0 {
  235. fmt.Println("diff1", diff1, tmp["_id"])
  236. } else if diff1 <= 60 {
  237. pc_diff1++
  238. } else if diff1 <= 3*60 {
  239. pc_diff3++
  240. } else if diff1 <= 5*60 {
  241. pc_diff5++
  242. } else if diff1 <= 10*60 {
  243. pc_diff10++
  244. } else if diff1 <= 15*60 {
  245. pc_diff15++
  246. } else if diff1 <= 30*60 {
  247. pc_diff30++
  248. } else {
  249. pc_diff301++
  250. }
  251. if diff2 < 0 {
  252. fmt.Println("diff2", diff2, tmp["_id"])
  253. }
  254. es_comeintime_totaltime += diff1
  255. es_publishtime_totaltime += diff2
  256. timeCount++
  257. if _, ok := tmp["attach_text"]; ok {
  258. curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
  259. diff3 := curtime - comeintime
  260. if diff3 >= 0 {
  261. file_totaltime += diff3
  262. }
  263. fileCount++
  264. } else {
  265. curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
  266. diff4 := curtime - comeintime
  267. if diff4 >= 0 {
  268. no_file_totaltime += diff4
  269. }
  270. noFileCount++
  271. }
  272. }
  273. }
  274. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" || util.ObjToString(tmp["site"]) == "标800" {
  275. count5++
  276. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
  277. competeReal1++
  278. }
  279. if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
  280. competeReal2++
  281. }
  282. if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
  283. competeReal3++
  284. }
  285. if util.ObjToString(tmp["site"]) == "友云采" {
  286. competeReal4++
  287. }
  288. if util.ObjToString(tmp["site"]) == "标800" {
  289. competeReal5++
  290. }
  291. }
  292. }
  293. }
  294. if timeCount > 0 {
  295. es_comeintime_avgtime = es_comeintime_totaltime / int64(timeCount)
  296. es_publishtime_avgtime = es_publishtime_totaltime / int64(timeCount)
  297. }
  298. if fileCount > 0 {
  299. file_avgltime = file_totaltime / int64(fileCount)
  300. }
  301. if noFileCount > 0 {
  302. no_file_avgltime = no_file_totaltime / int64(noFileCount)
  303. }
  304. //report += `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-count) + `,mgo总入库量:` + fmt.Sprint(count2) +
  305. // `<br>竟品统计结果_es统计:` + strconv.Itoa(count1) + `,mgo统计:` + fmt.Sprint(count5) + `,差值:` + fmt.Sprint(count5-count1) + `,mgo总入库量` + fmt.Sprint(count4) +
  306. // `<br>元博网_es统计:` + strconv.Itoa(countComptet1) + `,mgo统计:` + fmt.Sprint(competeReal1) + `,差值:` + fmt.Sprint(competeReal1-countComptet1) + `,mgo总入库量` + fmt.Sprint(competCount1) +
  307. // `<br>中国招标与采购网_es统计` + strconv.Itoa(countComptet2) + `,mgo统计:` + fmt.Sprint(competeReal2) + `,差值:` + fmt.Sprint(competeReal2-countComptet2) + `,mgo总入库量` + fmt.Sprint(competCount2) +
  308. // `<br>北京隆道网络科技有限公司_es统计` + strconv.Itoa(countComptet3) + `,mgo统计:` + fmt.Sprint(competeReal3) + `,差值:` + fmt.Sprint(competeReal3-countComptet3) + `,mgo总入库量` + fmt.Sprint(competCount3) +
  309. // `<br>友云采_es统计` + strconv.Itoa(countComptet4) + `,mgo统计:` + fmt.Sprint(competeReal4) + `,差值:` + fmt.Sprint(competeReal4-countComptet4) + `,mgo总入库量` + fmt.Sprint(competCount4) +
  310. // `<br>新集群统计结果_es数量:` + strconv.Itoa(countNew) + `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-countNew)
  311. //
  312. var reportBuilder strings.Builder
  313. // 开始表格
  314. reportBuilder.WriteString("<table>")
  315. reportBuilder.WriteString(formattedTime)
  316. reportBuilder.WriteString("<tr><th>统计项</th><th>es统计</th><th>mgo统计</th><th>差值</th><th>mgo总入库量</th></tr>")
  317. addTableRow(&reportBuilder, "阿里云es集群", count, count3, count3-count, count2)
  318. addTableRow(&reportBuilder, "竟品统计结果", count1, count5, count5-count1, count4)
  319. addTableRow(&reportBuilder, "元博网", countComptet1, competeReal1, competeReal1-countComptet1, competCount1)
  320. addTableRow(&reportBuilder, "中国招标与采购网", countComptet2, competeReal2, competeReal2-countComptet2, competCount2)
  321. addTableRow(&reportBuilder, "北京隆道网络科技有限公司", countComptet3, competeReal3, competeReal3-countComptet3, competCount3)
  322. addTableRow(&reportBuilder, "友云采", countComptet4, competeReal4, competeReal4-countComptet4, competCount4)
  323. addTableRow(&reportBuilder, "标800", countComptet5, competeReal5, competeReal5-countComptet5, competCount5)
  324. addTableRow(&reportBuilder, "华为云es集群", countNew, count3, count3-countNew, count2)
  325. // 结束表格
  326. reportBuilder.WriteString("</table>")
  327. // 获取最终的报告字符串
  328. report = reportBuilder.String()
  329. //存入数据库
  330. yesterday := now.AddDate(0, 0, -1)
  331. insert := map[string]interface{}{
  332. "es_count": count, //
  333. "es3_count": countNew, //
  334. "mgo_count": count3,
  335. "es_mgo_diff": count3 - count,
  336. "mgo_total": count2,
  337. "competitor_es_count": count1, //竞品网站es 数量
  338. "competitor_mgo_count": count5,
  339. "competitor_diff": count5 - count1,
  340. "competitor_mgo_total": count4,
  341. "date": yesterday.Format("2006-01-02"),
  342. "es_comeintime_totaltime": es_comeintime_totaltime,
  343. "es_publishtime_totaltime": es_publishtime_totaltime,
  344. "es_comeintime_avgtime": es_comeintime_avgtime,
  345. "es_publishtime_avgtime": es_publishtime_avgtime,
  346. "file_avgltime": file_avgltime,
  347. "no_file_avgltime": no_file_avgltime,
  348. "file_totaltime": file_totaltime,
  349. "no_file_totaltime": no_file_totaltime,
  350. "file_count": fileCount,
  351. "no_file_count": noFileCount,
  352. "pc_diff1": pc_diff1,
  353. "pc_diff3": pc_diff3,
  354. "pc_diff5": pc_diff5,
  355. "pc_diff10": pc_diff10,
  356. "pc_diff15": pc_diff15,
  357. "pc_diff30": pc_diff30,
  358. "pc_diff301": pc_diff301,
  359. "timeCount": timeCount,
  360. }
  361. fmgo.Save("bidding_ribao", insert)
  362. }
  363. t.SendMail(report)
  364. //t.SendMail("【竟品统计结果】")
  365. //fmt.Println(report)
  366. }
  367. log.Println("task over:", t.Name, eq, count)
  368. }
  369. func (t *T) SendMail(report string) {
  370. url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, t.Name, report)
  371. fmt.Println("url=>", url)
  372. res, err := http.Get(url)
  373. if err != nil {
  374. fmt.Println("SendMail err ", err)
  375. } else {
  376. fmt.Println("SendMail res ", res)
  377. }
  378. }
  379. func addTableRow(builder *strings.Builder, itemName string, esCount, mgoCount, diff, mgoTotal int) {
  380. builder.WriteString(fmt.Sprintf(`<tr><th>%s</th><th>%d</th><th>%d</th><th>%d</th><th>%d</th></tr>`, itemName, esCount, mgoCount, diff, mgoTotal))
  381. }