main.go 14 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. es "github.com/olivere/elastic/v7"
  6. "github.com/robfig/cron/v3"
  7. "go.mongodb.org/mongo-driver/bson"
  8. "go.mongodb.org/mongo-driver/bson/primitive"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "log"
  13. "net/http"
  14. "strconv"
  15. "strings"
  16. "time"
  17. )
  18. //定时任务,去统计bidding索引,排查问题、预警
  19. type T struct {
  20. Cron string
  21. Name string
  22. Min int
  23. Max int
  24. Type string
  25. Tjscope string
  26. Mgo string
  27. }
  28. // es数据最报警
  29. var (
  30. config map[string]interface{}
  31. to string
  32. api string
  33. esAddr, esAddr2, esAddr3 string
  34. esIndex, esIndex2, esIndex3 string
  35. username, password string
  36. username3, password3 string
  37. Ts = []*T{}
  38. esQ = `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
  39. esQ1 = `{"query": {"bool": {"must": [{"range": {"id": {"gte": "%s","lt": "%s"}}},{"terms": {"bidding.site": ["元博网(采购与招标网)","中国招标与采购网"]}}]}}}`
  40. )
  41. func Init() {
  42. util.ReadConfig(&config)
  43. jkmail := config["jkmail"].(map[string]interface{})
  44. to, _ = jkmail["to"].(string)
  45. api, _ = jkmail["api"].(string)
  46. //esAddr, _ = config["esAddr"].(string)
  47. esIndex, _ = config["esIndex"].(string)
  48. esAddr2, _ = config["esAddr2"].(string)
  49. esIndex2, _ = config["esIndex2"].(string) //
  50. if _, ok := config["esAddr3"]; ok {
  51. esAddr3, _ = config["esAddr3"].(string)
  52. esIndex3, _ = config["esIndex3"].(string) //
  53. username3, _ = config["username3"].(string)
  54. password3, _ = config["password3"].(string)
  55. }
  56. username, _ = config["username"].(string)
  57. password, _ = config["password"].(string)
  58. tasks, _ := config["task"].([]interface{})
  59. for _, t := range tasks {
  60. bs, _ := json.Marshal(t)
  61. var v *T
  62. json.Unmarshal(bs, &v)
  63. if v != nil {
  64. Ts = append(Ts, v)
  65. }
  66. }
  67. }
  68. func main() {
  69. Init()
  70. log.Println("start..")
  71. if len(Ts) > 0 {
  72. local, _ := time.LoadLocation("Asia/Shanghai")
  73. c := cron.New(cron.WithLocation(local), cron.WithSeconds())
  74. for _, v := range Ts {
  75. c.AddFunc(v.Cron, v.task)
  76. }
  77. c.Start()
  78. defer c.Stop()
  79. select {}
  80. }
  81. log.Println("end..")
  82. }
  83. func (t *T) task() {
  84. //初始化语句
  85. qt := strings.Split(t.Tjscope, ",")
  86. if len(qt) != 2 {
  87. return
  88. }
  89. st, et := int64(0), int64(0)
  90. now := time.Now()
  91. switch qt[1] {
  92. case "h":
  93. et = now.Unix()
  94. st = et + util.Int64All(qt[0])*3600
  95. case "d":
  96. st = util.GetDayStartSecond(util.IntAll(qt[0]))
  97. et = util.GetDayStartSecond(0)
  98. }
  99. st1 := fmt.Sprintf("%x0000000000000000", st)
  100. et1 := fmt.Sprintf("%x0000000000000000", et)
  101. eq := fmt.Sprintf(esQ, st1, et1)
  102. fmt.Println("eq", eq)
  103. eq1 := fmt.Sprintf(esQ1, st1, et1)
  104. fmt.Println("eq1", eq1)
  105. rangeQuery := es.NewRangeQuery("id").Gte(st1).Lt(et1)
  106. //竞品网站统计
  107. termsQuery := es.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网", "北京隆道网络科技有限公司", "友云采")
  108. countQuery := es.NewBoolQuery().
  109. Must(rangeQuery)
  110. //竞品网站总量
  111. count1Query := es.NewBoolQuery().Must(rangeQuery).Filter(termsQuery)
  112. //元博网(采购与招标网),竞品网站数量统计
  113. countComptetQuery1 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "元博网(采购与招标网)"))
  114. countComptetQuery2 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "中国招标与采购网"))
  115. countComptetQuery3 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "北京隆道网络科技有限公司"))
  116. countComptetQuery4 := es.NewBoolQuery().Must(rangeQuery).Filter(es.NewTermQuery("site", "友云采"))
  117. es2 := elastic.Elastic{S_esurl: esAddr2, I_size: 2, Username: username, Password: password}
  118. es2.InitElasticSize()
  119. es3 := elastic.Elastic{S_esurl: esAddr3, I_size: 5, Username: username3, Password: password3}
  120. es3.InitElasticSize()
  121. count := int(es2.Count(esIndex2, countQuery)) //公司 es集群 数量统计
  122. count1 := int(es2.Count(esIndex, count1Query)) //竞品网站 数量统计
  123. //元博网(采购与招标网)
  124. countComptet1 := int(es2.Count(esIndex, countComptetQuery1))
  125. //中国招标与采购网
  126. countComptet2 := int(es2.Count(esIndex, countComptetQuery2))
  127. //北京隆道网络科技有限公司
  128. countComptet3 := int(es2.Count(esIndex, countComptetQuery3))
  129. //友云采
  130. countComptet4 := int(es2.Count(esIndex, countComptetQuery4))
  131. countNew := int(es3.Count(esIndex3, countQuery)) //华为云 新集群
  132. log.Println("count", count)
  133. log.Println("count1", count1)
  134. log.Println("countNew", countNew)
  135. report := ""
  136. switch t.Type {
  137. case "alert":
  138. if count < t.Min || count > t.Max || countNew < t.Min || countNew > t.Max {
  139. report = fmt.Sprintf("告警%s,最小%d,最大%d,统计结果:%d", t.Name, t.Min, t.Max, count)
  140. t.SendMail(report)
  141. }
  142. case "report":
  143. report = fmt.Sprintf("统计报告%s,【统计结果】,es库数量:%d", t.Name, count)
  144. if len(t.Mgo) > 5 {
  145. fs := strings.Split(t.Mgo, "|")
  146. fmgo := mongodb.NewMgoWithUser(fs[0], fs[3], fs[1], fs[2], 1)
  147. id1 := mongodb.StringTOBsonId(st1)
  148. id2 := mongodb.StringTOBsonId(et1)
  149. mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
  150. fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
  151. sess := fmgo.GetMgoConn()
  152. defer fmgo.DestoryMongoConn(sess)
  153. /**
  154. count 一天内,es 中 数据总量
  155. count1 一天内,es 中 竞品总量
  156. count2 一天内,mgo 总入库量
  157. count3 一天内,mgo 有效数据 总数
  158. count4 一天内,mgo 中 竞品数据总量
  159. count5 一天内,mgo 有效数据中,竞品的数量
  160. countNew 一天内,es3新集群 中 数据总量
  161. timeCount pici-comeintime 时差在12小时内的数据
  162. */
  163. count2, count3 := 0, 0 //
  164. count4, count5 := 0, 0 //竟品
  165. es_comeintime_totaltime := int64(0) //comeintime 和 生索引 pici 时间 差值的总和
  166. es_publishtime_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和
  167. es_comeintime_avgtime := int64(0) //comeintime 和 生索引 pici 时间 差值的平均值
  168. es_publishtime_avgtime := int64(0) //publishtime 和 生索引 pici 时间 差值的平均值
  169. file_totaltime := int64(0)
  170. no_file_totaltime := int64(0)
  171. file_avgltime := int64(0)
  172. no_file_avgltime := int64(0)
  173. timeCount := 0 //12小时 统计时间差的数据量
  174. fileCount := 0
  175. noFileCount := 0
  176. //统计pici -comeintime 时间差,1、3、5、10、15、30、30+ 分钟
  177. var pc_diff1 int64
  178. var pc_diff3 int64
  179. var pc_diff5 int64
  180. var pc_diff10 int64
  181. var pc_diff15 int64
  182. var pc_diff30 int64
  183. var pc_diff301 int64
  184. var competCount1 int
  185. var competCount2 int
  186. var competCount3 int
  187. var competCount4 int
  188. competeReal1 := 0
  189. competeReal2 := 0
  190. competeReal3 := 0
  191. competeReal4 := 0
  192. query := sess.DB(fs[3]).C(fs[4]).Find(mq).Select(fd).Iter()
  193. for tmp := make(map[string]interface{}); query.Next(tmp); count2++ {
  194. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" {
  195. count4++
  196. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
  197. competCount1++
  198. }
  199. if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
  200. competCount2++
  201. }
  202. if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
  203. competCount3++
  204. }
  205. if util.ObjToString(tmp["site"]) == "友云采" {
  206. competCount4++
  207. }
  208. }
  209. if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
  210. count3++
  211. comeintime := util.Int64All(tmp["comeintime"])
  212. publishtime := util.Int64All(tmp["publishtime"])
  213. pici := util.Int64All(tmp["pici"])
  214. tp := time.Unix(publishtime, 0)
  215. isZero := false
  216. if (tp.Hour() == 0 && tp.Minute() == 0) && tp.Second() == 0 {
  217. isZero = true
  218. }
  219. if (comeintime-publishtime) < 12*60*60 && !isZero {
  220. if pici > 0 {
  221. diff1 := pici - comeintime
  222. diff2 := pici - publishtime
  223. if diff1 < 0 {
  224. fmt.Println("diff1", diff1, tmp["_id"])
  225. } else if diff1 <= 60 {
  226. pc_diff1++
  227. } else if diff1 <= 3*60 {
  228. pc_diff3++
  229. } else if diff1 <= 5*60 {
  230. pc_diff5++
  231. } else if diff1 <= 10*60 {
  232. pc_diff10++
  233. } else if diff1 <= 15*60 {
  234. pc_diff15++
  235. } else if diff1 <= 30*60 {
  236. pc_diff30++
  237. } else {
  238. pc_diff301++
  239. }
  240. if diff2 < 0 {
  241. fmt.Println("diff2", diff2, tmp["_id"])
  242. }
  243. es_comeintime_totaltime += diff1
  244. es_publishtime_totaltime += diff2
  245. timeCount++
  246. if _, ok := tmp["attach_text"]; ok {
  247. curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
  248. diff3 := curtime - comeintime
  249. if diff3 >= 0 {
  250. file_totaltime += diff3
  251. }
  252. fileCount++
  253. } else {
  254. curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
  255. diff4 := curtime - comeintime
  256. if diff4 >= 0 {
  257. no_file_totaltime += diff4
  258. }
  259. noFileCount++
  260. }
  261. }
  262. }
  263. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" || util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" || util.ObjToString(tmp["site"]) == "友云采" {
  264. count5++
  265. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" {
  266. competeReal1++
  267. }
  268. if util.ObjToString(tmp["site"]) == "中国招标与采购网" {
  269. competeReal2++
  270. }
  271. if util.ObjToString(tmp["site"]) == "北京隆道网络科技有限公司" {
  272. competeReal3++
  273. }
  274. if util.ObjToString(tmp["site"]) == "友云采" {
  275. competeReal4++
  276. }
  277. }
  278. }
  279. }
  280. if timeCount > 0 {
  281. es_comeintime_avgtime = es_comeintime_totaltime / int64(timeCount)
  282. es_publishtime_avgtime = es_publishtime_totaltime / int64(timeCount)
  283. }
  284. if fileCount > 0 {
  285. file_avgltime = file_totaltime / int64(fileCount)
  286. }
  287. if noFileCount > 0 {
  288. no_file_avgltime = no_file_totaltime / int64(noFileCount)
  289. }
  290. report += `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-count) + `,mgo总入库量:` + fmt.Sprint(count2) +
  291. `<br>竟品统计结果_es统计:` + strconv.Itoa(count1) + `,mgo统计:` + fmt.Sprint(count5) + `,差值:` + fmt.Sprint(count5-count1) + `,mgo总入库量` + fmt.Sprint(count4) +
  292. `<br>元博网_es统计:` + strconv.Itoa(countComptet1) + `,mgo统计:` + fmt.Sprint(competeReal1) + `,差值:` + fmt.Sprint(competeReal1-countComptet1) + `,mgo总入库量` + fmt.Sprint(competCount1) +
  293. `<br>中国招标与采购网_es统计` + strconv.Itoa(countComptet2) + `,mgo统计:` + fmt.Sprint(competeReal2) + `,差值:` + fmt.Sprint(competeReal2-countComptet2) + `,mgo总入库量` + fmt.Sprint(competCount2) +
  294. `<br>北京隆道网络科技有限公司_es统计` + strconv.Itoa(countComptet3) + `,mgo统计:` + fmt.Sprint(competeReal3) + `,差值:` + fmt.Sprint(competeReal3-countComptet3) + `,mgo总入库量` + fmt.Sprint(competCount3) +
  295. `<br>友云采_es统计` + strconv.Itoa(countComptet4) + `,mgo统计:` + fmt.Sprint(competeReal4) + `,差值:` + fmt.Sprint(competeReal4-countComptet4) + `,mgo总入库量` + fmt.Sprint(competCount4) +
  296. `<br>新集群统计结果_es数量:` + strconv.Itoa(countNew) + `,mgo统计:` + fmt.Sprint(count3) + `,差值:` + fmt.Sprint(count3-countNew)
  297. //存入数据库
  298. yesterday := now.AddDate(0, 0, -1)
  299. insert := map[string]interface{}{
  300. "es_count": count, //
  301. "es3_count": countNew, //
  302. "mgo_count": count3,
  303. "es_mgo_diff": count3 - count,
  304. "mgo_total": count2,
  305. "competitor_es_count": count1, //竞品网站es 数量
  306. "competitor_mgo_count": count5,
  307. "competitor_diff": count5 - count1,
  308. "competitor_mgo_total": count4,
  309. "date": yesterday.Format("2006-01-02"),
  310. "es_comeintime_totaltime": es_comeintime_totaltime,
  311. "es_publishtime_totaltime": es_publishtime_totaltime,
  312. "es_comeintime_avgtime": es_comeintime_avgtime,
  313. "es_publishtime_avgtime": es_publishtime_avgtime,
  314. "file_avgltime": file_avgltime,
  315. "no_file_avgltime": no_file_avgltime,
  316. "file_totaltime": file_totaltime,
  317. "no_file_totaltime": no_file_totaltime,
  318. "file_count": fileCount,
  319. "no_file_count": noFileCount,
  320. "pc_diff1": pc_diff1,
  321. "pc_diff3": pc_diff3,
  322. "pc_diff5": pc_diff5,
  323. "pc_diff10": pc_diff10,
  324. "pc_diff15": pc_diff15,
  325. "pc_diff30": pc_diff30,
  326. "pc_diff301": pc_diff301,
  327. "timeCount": timeCount,
  328. }
  329. fmgo.Save("bidding_ribao", insert)
  330. }
  331. t.SendMail(report)
  332. //t.SendMail("【竟品统计结果】")
  333. //fmt.Println(report)
  334. }
  335. log.Println("task over:", t.Name, eq, count)
  336. }
  337. func (t *T) SendMail(report string) {
  338. url := fmt.Sprintf("%s?to=%s&title=%s&body=%s", api, to, t.Name, report)
  339. fmt.Println("url=>", url)
  340. res, err := http.Get(url)
  341. if err != nil {
  342. fmt.Println("SendMail err ", err)
  343. } else {
  344. fmt.Println("SendMail res ", res)
  345. }
  346. }