main.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. package main
  2. import (
  3. "bytes"
  4. "context"
  5. "encoding/json"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. "github.com/robfig/cron/v3"
  9. "go.mongodb.org/mongo-driver/bson"
  10. "go.mongodb.org/mongo-driver/bson/primitive"
  11. "io"
  12. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  13. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  14. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  15. "log"
  16. "net/http"
  17. "net/url"
  18. "strings"
  19. "time"
  20. )
  21. //定时任务,去统计bidding索引,排查问题、预警
  22. type T struct {
  23. Cron string
  24. Name string
  25. Min int
  26. Max int
  27. Type string
  28. Tjscope string
  29. Mgo string
  30. }
  31. // es数据最报警
  32. var (
  33. config map[string]interface{}
  34. to string
  35. api string
  36. esAddr, esAddr2, esAddr3 string
  37. esIndex, esIndex2, esIndex3 string
  38. username, password string
  39. username3, password3 string
  40. Ts = []*T{}
  41. esQ = `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
  42. esQ1 = `{"query": {"bool": {"must": [{"range": {"id": {"gte": "%s","lt": "%s"}}},{"terms": {"bidding.site": ["元博网(采购与招标网)","中国招标与采购网"]}}]}}}`
  43. )
  44. func Init() {
  45. util.ReadConfig(&config)
  46. jkmail := config["jkmail"].(map[string]interface{})
  47. to, _ = jkmail["to"].(string)
  48. api, _ = jkmail["api"].(string)
  49. //esAddr, _ = config["esAddr"].(string)
  50. esIndex, _ = config["esIndex"].(string)
  51. esAddr2, _ = config["esAddr2"].(string)
  52. esIndex2, _ = config["esIndex2"].(string) //
  53. if _, ok := config["esAddr3"]; ok {
  54. esAddr3, _ = config["esAddr3"].(string)
  55. esIndex3, _ = config["esIndex3"].(string) //
  56. username3, _ = config["username3"].(string)
  57. password3, _ = config["password3"].(string)
  58. }
  59. username, _ = config["username"].(string)
  60. password, _ = config["password"].(string)
  61. tasks, _ := config["task"].([]interface{})
  62. for _, t := range tasks {
  63. bs, _ := json.Marshal(t)
  64. var v *T
  65. json.Unmarshal(bs, &v)
  66. if v != nil {
  67. Ts = append(Ts, v)
  68. }
  69. }
  70. }
  71. func main() {
  72. Init()
  73. log.Println("start..")
  74. if len(Ts) > 0 {
  75. local, _ := time.LoadLocation("Asia/Shanghai")
  76. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  77. for _, v := range Ts {
  78. _, err := c.AddFunc(v.Cron, v.task)
  79. if err != nil {
  80. panic(err)
  81. }
  82. }
  83. c.Start()
  84. defer c.Stop()
  85. select {}
  86. }
  87. log.Println("end..")
  88. }
  89. func (t *T) task() {
  90. //初始化语句
  91. qt := strings.Split(t.Tjscope, ",")
  92. if len(qt) != 2 {
  93. return
  94. }
  95. st, et := int64(0), int64(0)
  96. now := time.Now()
  97. switch qt[1] {
  98. case "m":
  99. et = now.Unix()
  100. st = et + util.Int64All(qt[0])*60
  101. case "h":
  102. et = now.Unix()
  103. st = et + util.Int64All(qt[0])*3600
  104. case "d":
  105. st = util.GetDayStartSecond(util.IntAll(qt[0]))
  106. et = util.GetDayStartSecond(0)
  107. }
  108. st1 := fmt.Sprintf("%x0000000000000000", st)
  109. et1 := fmt.Sprintf("%x0000000000000000", et)
  110. eq := fmt.Sprintf(esQ, st1, et1)
  111. fmt.Println("eq", eq)
  112. eq1 := fmt.Sprintf(esQ1, st1, et1)
  113. fmt.Println("eq1", eq1)
  114. // 将时间戳转换为 time.Time 类型
  115. stt := time.Unix(st, 0)
  116. // 格式化时间为 "YYYY-MM-DD HH:MM:SS" 形式
  117. formattedTime := stt.Format("2006-01-02")
  118. rangeQuery := es.NewRangeQuery("id").Gte(st1).Lt(et1)
  119. //竞品网站统计
  120. termsQuery := es.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网", "北京隆道网络科技有限公司", "友云采", "标800")
  121. countQuery := es.NewBoolQuery().
  122. Must(rangeQuery)
  123. //竞品网站总量
  124. count1Query := es.NewBoolQuery().Must(rangeQuery).Filter(termsQuery)
  125. //元博网(采购与招标网),竞品网站数量统计
  126. //countComptetQuery1 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "元博网(采购与招标网)"))
  127. //countComptetQuery2 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "中国招标与采购网"))
  128. //countComptetQuery3 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "北京隆道网络科技有限公司"))
  129. //countComptetQuery4 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "友云采"))
  130. //countComptetQuery5 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "标800"))
  131. es2 := elastic.Elastic{S_esurl: esAddr2, I_size: 2, Username: username, Password: password}
  132. es2.InitElasticSize()
  133. es3 := elastic.Elastic{S_esurl: esAddr3, I_size: 5, Username: username3, Password: password3}
  134. es3.InitElasticSize()
  135. count := int(es2.Count(esIndex2, countQuery)) //公司 es集群 数量统计
  136. count1 := int(es2.Count(esIndex, count1Query)) //竞品网站 数量统计
  137. ////元博网(采购与招标网)
  138. //countComptet1 := int(es2.Count(esIndex, countComptetQuery1))
  139. ////中国招标与采购网
  140. //countComptet2 := int(es2.Count(esIndex, countComptetQuery2))
  141. ////北京隆道网络科技有限公司
  142. //countComptet3 := int(es2.Count(esIndex, countComptetQuery3))
  143. ////友云采
  144. //countComptet4 := int(es2.Count(esIndex, countComptetQuery4))
  145. ////标800
  146. //countComptet5 := int(es2.Count(esIndex, countComptetQuery5))
  147. countNew := int(es3.Count(esIndex3, countQuery)) //华为云 新集群
  148. countNewTmp := int(es3.Count("bidding_temp", countQuery)) //华为云 新集群
  149. log.Println("count", count)
  150. log.Println("count1", count1)
  151. log.Println("countNew", countNew)
  152. report := ""
  153. switch t.Type {
  154. case "alert":
  155. if count < t.Min || count > t.Max || countNew < t.Min || countNew > t.Max {
  156. report = fmt.Sprintf("告警%s,最小%d,最大%d,统计结果:%d", t.Name, t.Min, t.Max, count)
  157. t.SendMail(report)
  158. wxurl := util.ObjToString(config["wxurl"])
  159. if wxurl != "" {
  160. SendBot(wxurl, report)
  161. }
  162. }
  163. case "report":
  164. //report = fmt.Sprintf("统计报告%s,【统计结果】,es库数量:%d", t.Name, count)
  165. if len(t.Mgo) > 5 {
  166. fs := strings.Split(t.Mgo, "|")
  167. fmgo := mongodb.NewMgoWithUser(fs[0], fs[3], fs[1], fs[2], 1)
  168. id1 := mongodb.StringTOBsonId(st1)
  169. id2 := mongodb.StringTOBsonId(et1)
  170. mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
  171. fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
  172. sess := fmgo.GetMgoConn()
  173. defer fmgo.DestoryMongoConn(sess)
  174. /**
  175. count 一天内,es 中 数据总量
  176. count1 一天内,es 中 竞品总量
  177. count2 一天内,mgo 总入库量
  178. count3 一天内,mgo 有效数据 总数
  179. count4 一天内,mgo 中 竞品数据总量
  180. count5 一天内,mgo 有效数据中,竞品的数量
  181. countNew 一天内,es3新集群 中 数据总量
  182. timeCount pici-comeintime 时差在12小时内的数据
  183. */
  184. count2, count3 := 0, 0 //
  185. count4, count5 := 0, 0 //竟品
  186. es_comeintime_totaltime := int64(0) //comeintime 和 生索引 pici 时间 差值的总和
  187. es_publishtime_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和
  188. es_comeintime_avgtime := int64(0) //comeintime 和 生索引 pici 时间 差值的平均值
  189. es_publishtime_avgtime := int64(0) //publishtime 和 生索引 pici 时间 差值的平均值
  190. file_totaltime := int64(0)
  191. no_file_totaltime := int64(0)
  192. file_avgltime := int64(0)
  193. no_file_avgltime := int64(0)
  194. timeCount := 0 //12小时 统计时间差的数据量
  195. fileCount := 0
  196. noFileCount := 0
  197. //统计pici -comeintime 时间差,1、3、5、10、15、30、30+ 分钟
  198. var pc_diff1 int64
  199. var pc_diff3 int64
  200. var pc_diff5 int64
  201. var pc_diff10 int64
  202. var pc_diff15 int64
  203. var pc_diff30 int64
  204. var pc_diff301 int64
  205. var competCount1 int
  206. var competCount2 int
  207. var competCount3 int
  208. var competCount4 int
  209. var competCount5 int
  210. competeReal1 := 0
  211. competeReal2 := 0
  212. competeReal3 := 0
  213. competeReal4 := 0
  214. competeReal5 := 0
  215. query := sess.DB(fs[3]).C(fs[4]).Find(mq).Select(fd).Iter()
  216. for tmp := make(map[string]interface{}); query.Next(tmp); count2++ {
  217. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" || util.ObjToString(tmp["site"]) == "标800" {
  218. count4++
  219. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
  220. competCount1++
  221. }
  222. if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
  223. competCount2++
  224. }
  225. if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
  226. competCount3++
  227. }
  228. if util.ObjToString(tmp["site"]) == "友云采" {
  229. competCount4++
  230. }
  231. if util.ObjToString(tmp["site"]) == "标800" {
  232. competCount5++
  233. }
  234. }
  235. if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
  236. count3++
  237. comeintime := util.Int64All(tmp["comeintime"])
  238. publishtime := util.Int64All(tmp["publishtime"])
  239. pici := util.Int64All(tmp["pici"])
  240. tp := time.Unix(publishtime, 0)
  241. isZero := false
  242. if (tp.Hour() == 0 && tp.Minute() == 0) && tp.Second() == 0 {
  243. isZero = true
  244. }
  245. if (comeintime-publishtime) < 12*60*60 && !isZero {
  246. if pici > 0 {
  247. diff1 := pici - comeintime
  248. diff2 := pici - publishtime
  249. if diff1 < 0 {
  250. fmt.Println("diff1", diff1, tmp["_id"])
  251. } else if diff1 <= 60 {
  252. pc_diff1++
  253. } else if diff1 <= 3*60 {
  254. pc_diff3++
  255. } else if diff1 <= 5*60 {
  256. pc_diff5++
  257. } else if diff1 <= 10*60 {
  258. pc_diff10++
  259. } else if diff1 <= 15*60 {
  260. pc_diff15++
  261. } else if diff1 <= 30*60 {
  262. pc_diff30++
  263. } else {
  264. pc_diff301++
  265. }
  266. if diff2 < 0 {
  267. fmt.Println("diff2", diff2, tmp["_id"])
  268. }
  269. es_comeintime_totaltime += diff1
  270. es_publishtime_totaltime += diff2
  271. timeCount++
  272. if _, ok := tmp["attach_text"]; ok {
  273. curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
  274. diff3 := curtime - comeintime
  275. if diff3 >= 0 {
  276. file_totaltime += diff3
  277. }
  278. fileCount++
  279. } else {
  280. curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
  281. diff4 := curtime - comeintime
  282. if diff4 >= 0 {
  283. no_file_totaltime += diff4
  284. }
  285. noFileCount++
  286. }
  287. }
  288. }
  289. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" || util.ObjToString(tmp["site"]) == "标800" {
  290. count5++
  291. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
  292. competeReal1++
  293. }
  294. if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
  295. competeReal2++
  296. }
  297. if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
  298. competeReal3++
  299. }
  300. if util.ObjToString(tmp["site"]) == "友云采" {
  301. competeReal4++
  302. }
  303. if util.ObjToString(tmp["site"]) == "标800" {
  304. competeReal5++
  305. }
  306. }
  307. }
  308. }
  309. if timeCount > 0 {
  310. es_comeintime_avgtime = es_comeintime_totaltime / int64(timeCount)
  311. es_publishtime_avgtime = es_publishtime_totaltime / int64(timeCount)
  312. }
  313. if fileCount > 0 {
  314. file_avgltime = file_totaltime / int64(fileCount)
  315. }
  316. if noFileCount > 0 {
  317. no_file_avgltime = no_file_totaltime / int64(noFileCount)
  318. }
  319. //report += `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-count) + `,mgo总入库量:` + fmt.Sprint(count2) +
  320. // `<br>竟品统计结果_es统计:` + strconv.Itoa(count1) + `,mgo统计:` + fmt.Sprint(count5) + `,差值:` + fmt.Sprint(count5-count1) + `,mgo总入库量` + fmt.Sprint(count4) +
  321. // `<br>元博网_es统计:` + strconv.Itoa(countComptet1) + `,mgo统计:` + fmt.Sprint(competeReal1) + `,差值:` + fmt.Sprint(competeReal1-countComptet1) + `,mgo总入库量` + fmt.Sprint(competCount1) +
  322. // `<br>中国招标与采购网_es统计` + strconv.Itoa(countComptet2) + `,mgo统计:` + fmt.Sprint(competeReal2) + `,差值:` + fmt.Sprint(competeReal2-countComptet2) + `,mgo总入库量` + fmt.Sprint(competCount2) +
  323. // `<br>北京隆道网络科技有限公司_es统计` + strconv.Itoa(countComptet3) + `,mgo统计:` + fmt.Sprint(competeReal3) + `,差值:` + fmt.Sprint(competeReal3-countComptet3) + `,mgo总入库量` + fmt.Sprint(competCount3) +
  324. // `<br>友云采_es统计` + strconv.Itoa(countComptet4) + `,mgo统计:` + fmt.Sprint(competeReal4) + `,差值:` + fmt.Sprint(competeReal4-countComptet4) + `,mgo总入库量` + fmt.Sprint(competCount4) +
  325. // `<br>新集群统计结果_es数量:` + strconv.Itoa(countNew) + `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-countNew)
  326. //html 格式
  327. //var reportBuilder strings.Builder
  328. //// 开始表格
  329. //reportBuilder.WriteString("<table>")
  330. //reportBuilder.WriteString(formattedTime)
  331. //reportBuilder.WriteString("<tr><th>统计项</th><th>es统计</th><th>mgo统计</th><th>差值</th><th>mgo总入库量</th></tr>")
  332. //
  333. //addTableRow(&reportBuilder, "阿里云es集群", count, count3, count3-count, count2)
  334. //addTableRow(&reportBuilder, "竟品统计结果", count1, count5, count5-count1, count4)
  335. //addTableRow(&reportBuilder, "元博网", countComptet1, competeReal1, competeReal1-countComptet1, competCount1)
  336. //addTableRow(&reportBuilder, "中国招标与采购网", countComptet2, competeReal2, competeReal2-countComptet2, competCount2)
  337. //addTableRow(&reportBuilder, "北京隆道网络科技有限公司", countComptet3, competeReal3, competeReal3-countComptet3, competCount3)
  338. //addTableRow(&reportBuilder, "友云采", countComptet4, competeReal4, competeReal4-countComptet4, competCount4)
  339. //addTableRow(&reportBuilder, "标800", countComptet5, competeReal5, competeReal5-countComptet5, competCount5)
  340. //addTableRow(&reportBuilder, "华为云es集群", countNew, count3, count3-countNew, count2)
  341. //// 结束表格
  342. //reportBuilder.WriteString("</table>")
  343. //// 获取最终的报告字符串
  344. //report = reportBuilder.String()
  345. var reportBuilder strings.Builder
  346. // 添加时间
  347. reportBuilder.WriteString(formattedTime + "\n")
  348. // Markdown 表头
  349. reportBuilder.WriteString("| 统计项 | es统计 | mgo统计 | 差值 | mgo总入库量 |\n")
  350. reportBuilder.WriteString("|--------|--------|---------|------|--------------|\n")
  351. addMarkdownRow(&reportBuilder, "阿里云es集群", count, count3, count3-count, count2)
  352. //addMarkdownRow(&reportBuilder, "竟品统计结果", count1, count5, count5-count1, count4)
  353. //addMarkdownRow(&reportBuilder, "元博网", countComptet1, competeReal1, competeReal1-countComptet1, competCount1)
  354. //addMarkdownRow(&reportBuilder, "中国招标与采购网", countComptet2, competeReal2, competeReal2-countComptet2, competCount2)
  355. //addMarkdownRow(&reportBuilder, "北京隆道网络科技有限公司", countComptet3, competeReal3, competeReal3-countComptet3, competCount3)
  356. //addMarkdownRow(&reportBuilder, "友云采", countComptet4, competeReal4, competeReal4-countComptet4, competCount4)
  357. //addMarkdownRow(&reportBuilder, "标800", countComptet5, competeReal5, competeReal5-countComptet5, competCount5)
  358. addMarkdownRow(&reportBuilder, "华为云es集群", countNew, count3, count3-countNew, count2)
  359. addMarkdownRow(&reportBuilder, "华为云es集群 bidding_temp", countNewTmp, count3, count3-countNewTmp, count2)
  360. report = reportBuilder.String()
  361. //存入数据库
  362. yesterday := now.AddDate(0, 0, -1)
  363. insert := map[string]interface{}{
  364. "es_count": count, //
  365. "es3_count": countNew, //
  366. "mgo_count": count3,
  367. "es_mgo_diff": count3 - count,
  368. "mgo_total": count2,
  369. "competitor_es_count": count1, //竞品网站es 数量
  370. "competitor_mgo_count": count5,
  371. "competitor_diff": count5 - count1,
  372. "competitor_mgo_total": count4,
  373. "date": yesterday.Format("2006-01-02"),
  374. "es_comeintime_totaltime": es_comeintime_totaltime,
  375. "es_publishtime_totaltime": es_publishtime_totaltime,
  376. "es_comeintime_avgtime": es_comeintime_avgtime,
  377. "es_publishtime_avgtime": es_publishtime_avgtime,
  378. "file_avgltime": file_avgltime,
  379. "no_file_avgltime": no_file_avgltime,
  380. "file_totaltime": file_totaltime,
  381. "no_file_totaltime": no_file_totaltime,
  382. "file_count": fileCount,
  383. "no_file_count": noFileCount,
  384. "pc_diff1": pc_diff1,
  385. "pc_diff3": pc_diff3,
  386. "pc_diff5": pc_diff5,
  387. "pc_diff10": pc_diff10,
  388. "pc_diff15": pc_diff15,
  389. "pc_diff30": pc_diff30,
  390. "pc_diff301": pc_diff301,
  391. "timeCount": timeCount,
  392. }
  393. fmgo.Save("bidding_ribao", insert)
  394. }
  395. t.SendMail(report)
  396. //t.SendMail("【竟品统计结果】")
  397. //fmt.Println(report)
  398. // 发送企业微信
  399. wxurl := util.ObjToString(config["wxurl"])
  400. if wxurl != "" {
  401. SendBot(wxurl, report)
  402. }
  403. //9008 bidding 数据和 9005 bidding 数据量不一致
  404. if countNew != count {
  405. client1, _ := es.NewClient(
  406. es.SetURL(esAddr2),
  407. es.SetBasicAuth(username, password),
  408. es.SetSniff(false),
  409. )
  410. client2, _ := es.NewClient(
  411. es.SetURL(esAddr3),
  412. es.SetBasicAuth(username3, password3),
  413. es.SetSniff(false),
  414. )
  415. // 9008 比 9005 的索引数据量多
  416. if count > countNew {
  417. ids1, err := fetchIDs(client1, "bidding", rangeQuery)
  418. if err != nil {
  419. log.Fatal(err)
  420. }
  421. fmt.Printf("集群 9008 bidding 总数: %d\n", len(ids1))
  422. ids2, err := fetchIDs(client2, "bidding", rangeQuery)
  423. if err != nil {
  424. log.Fatal(err)
  425. }
  426. fmt.Printf("集群9005 bidding 总数: %d\n", len(ids2))
  427. fmt.Println("差集 (集群9008 - 集群9005):")
  428. diffIDS := make([]string, 0)
  429. for id := range ids1 {
  430. if _, ok := ids2[id]; !ok {
  431. fmt.Println(id)
  432. diffIDS = append(diffIDS, id)
  433. }
  434. }
  435. text := fmt.Sprintf("集群9008 - 集群9005 bidding 的差集数据是:\n %v", diffIDS)
  436. SendBot(wxurl, text)
  437. } else {
  438. ids1, err := fetchIDs(client1, "bidding", rangeQuery)
  439. if err != nil {
  440. log.Fatal(err)
  441. }
  442. fmt.Printf("集群 9008 bidding 总数: %d\n", len(ids1))
  443. ids2, err := fetchIDs(client2, "bidding", rangeQuery)
  444. if err != nil {
  445. log.Fatal(err)
  446. }
  447. fmt.Printf("集群9005 bidding 总数: %d\n", len(ids2))
  448. fmt.Println("差集 (集群9005 - 集群9008):")
  449. diffIDS := make([]string, 0)
  450. for id := range ids2 {
  451. if _, ok := ids1[id]; !ok {
  452. fmt.Println(id)
  453. }
  454. }
  455. text := fmt.Sprintf("集群9005 - 集群9008 bidding 的差集数据是:\n %v", diffIDS)
  456. SendBot(wxurl, text)
  457. }
  458. }
  459. if countNewTmp != count {
  460. client1, _ := es.NewClient(
  461. es.SetURL(esAddr2),
  462. es.SetBasicAuth(username, password),
  463. es.SetSniff(false),
  464. )
  465. client2, _ := es.NewClient(
  466. es.SetURL(esAddr3),
  467. es.SetBasicAuth(username3, password3),
  468. es.SetSniff(false),
  469. )
  470. if count > countNewTmp {
  471. ids1, err := fetchIDs(client1, "bidding", rangeQuery)
  472. if err != nil {
  473. log.Fatal(err)
  474. }
  475. fmt.Printf("集群 9008 bidding 总数: %d\n", len(ids1))
  476. ids2, err := fetchIDs(client2, "bidding_temp", rangeQuery)
  477. if err != nil {
  478. log.Fatal(err)
  479. }
  480. fmt.Printf("集群9005 bidding_temp 总数: %d\n", len(ids2))
  481. fmt.Println("差集 (集群9008 bidding - 集群9005 bidding_temp ):")
  482. diffIDS := make([]string, 0)
  483. for id := range ids1 {
  484. if _, ok := ids2[id]; !ok {
  485. fmt.Println(id)
  486. diffIDS = append(diffIDS, id)
  487. }
  488. }
  489. text := fmt.Sprintf("集群9008 bidding - 集群9005 bidding_temp 的差集数据是:\n %v", diffIDS)
  490. SendBot(wxurl, text)
  491. } else {
  492. ids1, err := fetchIDs(client1, "bidding", rangeQuery)
  493. if err != nil {
  494. log.Fatal(err)
  495. }
  496. fmt.Printf("集群 9008 bidding 总数: %d\n", len(ids1))
  497. ids2, err := fetchIDs(client2, "bidding_temp", rangeQuery)
  498. if err != nil {
  499. log.Fatal(err)
  500. }
  501. fmt.Printf("集群9005 bidding_temp 总数: %d\n", len(ids2))
  502. fmt.Println("差集 (集群9005 biding_temp - 集群9008 bidding ):")
  503. diffIDS := make([]string, 0)
  504. for id := range ids2 {
  505. if _, ok := ids1[id]; !ok {
  506. fmt.Println(id)
  507. }
  508. }
  509. text := fmt.Sprintf("集群9005 biding_temp - 集群9008 bidding的差集数据是:\n %v", diffIDS)
  510. SendBot(wxurl, text)
  511. }
  512. }
  513. }
  514. log.Println("task over:", t.Name, eq, count)
  515. }
  516. func (t *T) SendMail(report string) {
  517. params := url.Values{}
  518. params.Add("to", to) // 收件人列表
  519. params.Add("title", t.Name)
  520. params.Add("body", report)
  521. url := fmt.Sprintf("%s?%s", api, params.Encode())
  522. fmt.Println("url=>", url)
  523. res, err := http.Get(url)
  524. if err != nil {
  525. fmt.Println("SendMail err ", err)
  526. } else {
  527. fmt.Println("SendMail res ", res)
  528. }
  529. }
  530. func addTableRow(builder *strings.Builder, itemName string, esCount, mgoCount, diff, mgoTotal int) {
  531. builder.WriteString(fmt.Sprintf(`<tr><th>%s</th><th>%d</th><th>%d</th><th>%d</th><th>%d</th></tr>`, itemName, esCount, mgoCount, diff, mgoTotal))
  532. }
  533. func addMarkdownRow(builder *strings.Builder, itemName string, esCount, mgoCount, diff, mgoTotal int) {
  534. builder.WriteString(fmt.Sprintf("| %s | %d | %d | %d | %d |\n", itemName, esCount, mgoCount, diff, mgoTotal))
  535. }
  536. // SendBot 发送企业微信
  537. func SendBot(webhookURL, msg string) (b bool) {
  538. // 构造请求体
  539. payload := map[string]interface{}{
  540. "msgtype": "text",
  541. "text": map[string]string{
  542. "content": msg,
  543. },
  544. }
  545. // 转换为 JSON 字符串
  546. payloadBytes, err := json.Marshal(payload)
  547. if err != nil {
  548. log.Println("Error:", err)
  549. return
  550. }
  551. // 发送 POST 请求
  552. resp, err := http.Post(webhookURL, "application/json", bytes.NewReader(payloadBytes))
  553. if err != nil {
  554. log.Println("Error:", err)
  555. return
  556. }
  557. defer resp.Body.Close()
  558. b = true
  559. //打印响应结果
  560. log.Println("send bot Status:", resp.Status)
  561. return
  562. }
  563. func fetchIDs(client *es.Client, index string, query es.Query) (map[string]struct{}, error) {
  564. ctx := context.Background()
  565. ids := make(map[string]struct{})
  566. scrollID := ""
  567. scroll := "10m"
  568. searchSource := es.NewSearchSource().
  569. Query(query).
  570. Size(10000).
  571. Sort("_doc", true)
  572. searchService := client.Scroll(index).
  573. Size(10000).
  574. Scroll(scroll).
  575. SearchSource(searchSource)
  576. res, err := searchService.Do(ctx)
  577. if err != nil {
  578. if err == io.EOF {
  579. fmt.Println("没有数据")
  580. return ids, nil
  581. }
  582. return nil, fmt.Errorf("初始化滚动搜索失败: %w", err)
  583. }
  584. fmt.Println("命中总数:", res.TotalHits())
  585. total := 0
  586. for len(res.Hits.Hits) > 0 {
  587. for _, hit := range res.Hits.Hits {
  588. // 直接收集 _id
  589. ids[hit.Id] = struct{}{}
  590. }
  591. total += len(res.Hits.Hits)
  592. scrollID = res.ScrollId
  593. res, err = client.Scroll().
  594. ScrollId(scrollID).
  595. Scroll(scroll).
  596. Do(ctx)
  597. //log.Println("已获取数量:", total)
  598. if err != nil {
  599. if err == io.EOF {
  600. break
  601. }
  602. return ids, fmt.Errorf("滚动搜索失败: %w", err)
  603. }
  604. }
  605. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  606. if err != nil {
  607. log.Printf("清理滚动搜索失败:%s", err)
  608. }
  609. return ids, nil
  610. }