es_test.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. es7 "github.com/olivere/elastic/v7"
  6. "go.mongodb.org/mongo-driver/bson"
  7. "go.mongodb.org/mongo-driver/bson/primitive"
  8. "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "log"
  12. "strconv"
  13. "testing"
  14. "time"
  15. )
  16. //日报程序
  17. func TestR(t *testing.T) {
  18. start := -1
  19. end := 0
  20. esQ := `{"query": {"range": {"id": {"gte": "%s","lt": "%s"}}}}`
  21. esQ1 := `{"query": {"bool": {"must": [{"range": {"id": {"gte": "%s","lt": "%s"}}},{"terms": {"bidding.site": ["元博网(采购与招标网)","中国招标与采购网"]}}]}}}`
  22. url := "http://127.0.0.1:19805"
  23. username := "es_all"
  24. password := "TopJkO2E_d1x"
  25. url2 := "http://127.0.0.1:19905"
  26. username2 := "jybid"
  27. password2 := "Top2023_JEB01i@31"
  28. // 创建 Elasticsearch 客户端
  29. client, err := es7.NewClient(
  30. es7.SetURL(url),
  31. es7.SetBasicAuth(username, password),
  32. es7.SetSniff(false),
  33. )
  34. if err != nil {
  35. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  36. }
  37. // 创建 Elasticsearch 客户端
  38. client2, err := es7.NewClient(
  39. es7.SetURL(url2),
  40. es7.SetBasicAuth(username2, password2),
  41. es7.SetSniff(false),
  42. )
  43. if err != nil {
  44. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  45. }
  46. st := util.GetDayStartSecond(start) //
  47. et := util.GetDayStartSecond(end) //
  48. st1 := fmt.Sprintf("%x0000000000000000", st)
  49. et1 := fmt.Sprintf("%x0000000000000000", et)
  50. eq := fmt.Sprintf(esQ, st1, et1)
  51. fmt.Println("eq", eq)
  52. eq1 := fmt.Sprintf(esQ1, st1, et1)
  53. fmt.Println("eq1", eq1)
  54. rangeQuery := es7.NewRangeQuery("id").Gte(st1).Lt(et1)
  55. termsQuery := es7.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网")
  56. countQuery := es7.NewBoolQuery().
  57. Must(rangeQuery)
  58. //
  59. count1Query := es7.NewBoolQuery().Must(rangeQuery).Filter(termsQuery)
  60. // 老集群数量
  61. count, err := client.Count("bidding").Query(countQuery).Do(context.Background())
  62. if err != nil {
  63. fmt.Println("统计文档数量出错:", err)
  64. }
  65. // 竞品数量
  66. count1, err := client.Count("bidding").Query(count1Query).Do(context.Background())
  67. if err != nil {
  68. fmt.Println("统计文档数量出错:", err)
  69. }
  70. //新集群数量
  71. countNew, err := client2.Count("bidding").Query(countQuery).Do(context.Background())
  72. if err != nil {
  73. fmt.Println("统计文档数量出错:", err)
  74. }
  75. fmt.Println(count)
  76. fmt.Println(count1)
  77. fmt.Println(countNew)
  78. Mgo := &mongodb.MongodbSim{
  79. MongodbAddr: "127.0.0.1:27083",
  80. DbName: "qfw",
  81. Size: 10,
  82. Direct: true,
  83. UserName: "SJZY_RWbid_ES",
  84. Password: "SJZY@B4i4D5e6S",
  85. }
  86. Mgo.InitPool()
  87. sess := Mgo.GetMgoConn()
  88. defer Mgo.DestoryMongoConn(sess)
  89. id1 := mongodb.StringTOBsonId(st1)
  90. id2 := mongodb.StringTOBsonId(et1)
  91. now := time.Now()
  92. mq := bson.M{"_id": bson.M{"$gte": id1, "$lt": id2}} //一天时间内的id段
  93. fd := bson.M{"extracttype": 1, "sensitive": 1, "dataging": 1, "site": 1, "infoformat": 1, "comeintime": 1, "pici": 1, "publishtime": 1, "competehref": 1, "attach_text": 1}
  94. /**
  95. count 一天内,es 中 数据总量
  96. count1 一天内,es 中 竞品总量
  97. count2 一天内,mgo 总入库量
  98. count3 一天内,mgo 有效数据 总数
  99. count4 一天内,mgo 中 竞品数据总量
  100. count5 一天内,mgo 有效数据中,竞品的数量
  101. countNew 一天内,es3新集群 中 数据总量
  102. */
  103. count2, count3 := int64(0), int64(0) //
  104. count4, count5 := int64(0), int64(0) //竟品
  105. es_comeintime_totaltime := int64(0) //comeintime 和 生索引 pici 时间 差值的总和
  106. es_publishtime_totaltime := int64(0) //publishtime 和 生索引 pici 时间 差值的总和
  107. es_comeintime_avgtime := int64(0) //comeintime 和 生索引 pici 时间 差值的平均值
  108. es_publishtime_avgtime := int64(0) //publishtime 和 生索引 pici 时间 差值的平均值
  109. file_totaltime := int64(0)
  110. no_file_totaltime := int64(0)
  111. file_avgltime := int64(0)
  112. no_file_avgltime := int64(0)
  113. timeCount := 0 // 统计时间差的数据量
  114. fileCount := 0
  115. noFileCount := 0
  116. //统计pici -comeintime 时间差,1、3、5、10、15、30、30+ 分钟
  117. var pc_diff1 int64
  118. var pc_diff3 int64
  119. var pc_diff5 int64
  120. var pc_diff10 int64
  121. var pc_diff15 int64
  122. var pc_diff30 int64
  123. var pc_diff301 int64
  124. query := sess.DB("qfw").C("bidding").Find(mq).Select(fd).Iter()
  125. for tmp := make(map[string]interface{}); query.Next(tmp); count2++ {
  126. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" {
  127. count4++
  128. }
  129. if util.IntAll(tmp["extracttype"]) != -1 && util.ObjToString(tmp["sensitive"]) != "测试" && util.IntAll(tmp["dataging"]) != 1 && util.Float64All(tmp["infoformat"]) != 3 {
  130. count3++
  131. comeintime := util.Int64All(tmp["comeintime"])
  132. publishtime := util.Int64All(tmp["publishtime"])
  133. pici := util.Int64All(tmp["pici"])
  134. if (comeintime - publishtime) < 12*60*60 {
  135. if pici > 0 {
  136. diff1 := pici - comeintime
  137. diff2 := pici - publishtime
  138. if diff1 < 0 {
  139. fmt.Println("diff1", diff1, tmp["_id"])
  140. } else if diff1 <= 60 {
  141. pc_diff1++
  142. } else if diff1 <= 3*60 {
  143. pc_diff3++
  144. } else if diff1 <= 5*60 {
  145. pc_diff5++
  146. } else if diff1 <= 10*60 {
  147. pc_diff10++
  148. } else if diff1 <= 15*60 {
  149. pc_diff15++
  150. } else if diff1 <= 30*60 {
  151. pc_diff30++
  152. } else {
  153. pc_diff301++
  154. }
  155. if diff2 < 0 {
  156. fmt.Println("diff2", diff2, tmp["_id"])
  157. }
  158. es_comeintime_totaltime += diff1
  159. es_publishtime_totaltime += diff2
  160. timeCount++
  161. if _, ok := tmp["attach_text"]; ok {
  162. curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
  163. diff3 := curtime - comeintime
  164. if diff3 >= 0 {
  165. file_totaltime += diff3
  166. }
  167. fileCount++
  168. } else {
  169. curtime := tmp["_id"].(primitive.ObjectID).Timestamp().Unix()
  170. diff4 := curtime - comeintime
  171. if diff4 >= 0 {
  172. no_file_totaltime += diff4
  173. }
  174. noFileCount++
  175. }
  176. }
  177. }
  178. if util.ObjToString(tmp["site"]) == "元博网(采购与招标网)" || util.ObjToString(tmp["site"]) == "中国招标与采购网" {
  179. count5++
  180. }
  181. }
  182. }
  183. if timeCount > 0 {
  184. es_comeintime_avgtime = es_comeintime_totaltime / int64(timeCount)
  185. es_publishtime_avgtime = es_publishtime_totaltime / int64(timeCount)
  186. }
  187. if fileCount > 0 {
  188. file_avgltime = file_totaltime / int64(fileCount)
  189. }
  190. if noFileCount > 0 {
  191. no_file_avgltime = no_file_totaltime / int64(noFileCount)
  192. }
  193. report := fmt.Sprintf("统计报告%s,【统计结果】,es库数量:%d", "日报", count)
  194. report += ",mgo统计:" + fmt.Sprint(count3) + ",差值:" + fmt.Sprint(count3-count) + ",mgo总入库量:" + fmt.Sprint(count2)
  195. report += "<br>" + "【竟品统计结果】:" + strconv.Itoa(int(count1)) + ",mgo统计:" + fmt.Sprint(count5) + ",差值:" + fmt.Sprint(count5-count1) + ",mgo总入库量" + fmt.Sprint(count4) + "<br>【新集群统计结果】es数量:" + strconv.Itoa(int(countNew)) + ",mgo统计:" + fmt.Sprint(count3) + ",差值:" + fmt.Sprint(count3-countNew)
  196. //存入数据库
  197. fmt.Println(report)
  198. //存入数据库
  199. yesterday := now.AddDate(0, 0, start)
  200. insert := map[string]interface{}{
  201. "es_count": count, //
  202. "es3_count": countNew, //
  203. "mgo_count": count3,
  204. "es_mgo_diff": count3 - count,
  205. "mgo_total": count2,
  206. "competitor_es_count": count1, //竞品网站es 数量
  207. "competitor_mgo_count": count5,
  208. "competitor_diff": count5 - count1,
  209. "competitor_mgo_total": count4,
  210. "date": yesterday.Format("2006-01-02"),
  211. "es_comeintime_totaltime": es_comeintime_totaltime,
  212. "es_publishtime_totaltime": es_publishtime_totaltime,
  213. "es_comeintime_avgtime": es_comeintime_avgtime,
  214. "es_publishtime_avgtime": es_publishtime_avgtime,
  215. "file_avgltime": file_avgltime,
  216. "no_file_avgltime": no_file_avgltime,
  217. "file_totaltime": file_totaltime,
  218. "no_file_totaltime": no_file_totaltime,
  219. "file_count": fileCount,
  220. "no_file_count": noFileCount,
  221. "pc_diff1": pc_diff1,
  222. "pc_diff3": pc_diff3,
  223. "pc_diff5": pc_diff5,
  224. "pc_diff10": pc_diff10,
  225. "pc_diff15": pc_diff15,
  226. "pc_diff30": pc_diff30,
  227. "pc_diff301": pc_diff301,
  228. "timeCount": timeCount,
  229. }
  230. fmt.Println(insert)
  231. //Mgo.Save("bidding_ribao", insert)
  232. fmt.Println("success")
  233. }
  234. func TestCount(T *testing.T) {
  235. esClinet := elastic.Elastic{
  236. S_esurl: "http://127.0.0.1:19805",
  237. I_size: 2,
  238. Username: "es_all",
  239. Password: "TopJkO2E_d1x",
  240. }
  241. esClinet.InitElasticSize()
  242. rangeQuery := es7.NewRangeQuery("id").Gte("646746840000000000000000").Lt("646770b40000000000000000")
  243. //termsQuery := es.NewTermsQuery("site", "元博网(采购与招标网)", "中国招标与采购网")
  244. boolQuery := es7.NewBoolQuery().Must(rangeQuery)
  245. count1 := esClinet.Count("bidding", boolQuery)
  246. //fmt.Println("count1", count1)
  247. fmt.Println(strconv.Itoa(int(count1)))
  248. }