main.go 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "sort"
  12. "strings"
  13. )
  14. func main() {
  15. //getData()
  16. click()
  17. log.Println("over ------------------ over")
  18. }
  19. // getData 处理北京 京津翼数据 投标相关数据
  20. func getData() {
  21. url := "http://172.17.4.184:19805"
  22. //url := "http://127.0.0.1:19805"
  23. username := "es_all"
  24. password := "TopJkO2E_d1x"
  25. index := "projectset" //索引名称
  26. // 创建 Elasticsearch 客户端
  27. client, err := elastic.NewClient(
  28. elastic.SetURL(url),
  29. elastic.SetBasicAuth(username, password),
  30. elastic.SetSniff(false),
  31. )
  32. if err != nil {
  33. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  34. }
  35. //85 抽取库
  36. Mgo := &mongodb.MongodbSim{
  37. //MongodbAddr: "127.0.0.1:27080",
  38. MongodbAddr: "172.17.4.85:27080",
  39. DbName: "top",
  40. Size: 10,
  41. //Direct: true,
  42. }
  43. Mgo.InitPool()
  44. //MgoB := &mongodb.MongodbSim{
  45. // MongodbAddr: "172.17.189.140:27080",
  46. // //MongodbAddr: "127.0.0.1:27083",
  47. // Size: 10,
  48. // DbName: "qfw",
  49. // UserName: "SJZY_RWbid_ES",
  50. // Password: "SJZY@B4i4D5e6S",
  51. // //Direct: true,
  52. //}
  53. //MgoB.InitPool()
  54. //2023年01-01 2023-10-01,,1-3季度
  55. areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省")
  56. rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400)
  57. query := elastic.NewBoolQuery().
  58. Must(areaTermsQuery).
  59. Must(rangeQuery)
  60. //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
  61. //query := elastic.NewBoolQuery().
  62. // //北京,天津,河北,上海,江苏,浙江,安徽
  63. // //Must(elastic.NewTermQuery("area", "北京市")).
  64. // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
  65. // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
  66. // Must(rangeQuery)
  67. ctx := context.Background()
  68. //开始滚动搜索
  69. scrollID := ""
  70. scroll := "10m"
  71. searchSource := elastic.NewSearchSource().
  72. Query(query).
  73. Size(10000).
  74. Sort("_doc", true) //升序排序
  75. //Sort("_doc", false) //降序排序
  76. searchService := client.Scroll(index).
  77. Size(10000).
  78. Scroll(scroll).
  79. SearchSource(searchSource)
  80. res, err := searchService.Do(ctx)
  81. if err != nil {
  82. if err == io.EOF {
  83. fmt.Println("没有数据")
  84. } else {
  85. panic(err)
  86. }
  87. }
  88. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  89. fmt.Println("总数是:", res.TotalHits())
  90. total := 0
  91. for len(res.Hits.Hits) > 0 {
  92. for _, hit := range res.Hits.Hits {
  93. var doc map[string]interface{}
  94. err := json.Unmarshal(hit.Source, &doc)
  95. if err != nil {
  96. log.Printf("解析文档失败:%s", err)
  97. continue
  98. }
  99. delete(doc, "filetext")
  100. delete(doc, "detail")
  101. sWinner := util.ObjToString(doc["s_winner"])
  102. winners := strings.Split(sWinner, ",")
  103. for _, v := range winners {
  104. insert := doc
  105. insert["s_winner"] = v
  106. //存入新表
  107. err = Mgo.InsertOrUpdate("top", "wcc_beijing_0110", insert)
  108. if err != nil {
  109. log.Println("error", doc["id"])
  110. }
  111. }
  112. // 处理查询结果
  113. //area := util.ObjToString(doc["area"])
  114. //areas := []string{"北京", "上海", "广东", "江苏", "浙江"}
  115. //if !IsInStringArray(area, areas) {
  116. // continue
  117. //}
  118. //projectName := util.ObjToString(doc["projectname"])
  119. //if strings.Contains(projectName, "非政府") {
  120. // continue
  121. //}
  122. //buyerclass := util.ObjToString(doc["buyerclass"])
  123. //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" {
  124. // continue
  125. //}
  126. ////存入新表
  127. //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc)
  128. //if err != nil {
  129. // log.Println("error", doc["id"])
  130. //}
  131. }
  132. total = total + len(res.Hits.Hits)
  133. scrollID = res.ScrollId
  134. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  135. log.Println("current count:", total)
  136. if err != nil {
  137. if err == io.EOF {
  138. // 滚动到最后一批数据,退出循环
  139. break
  140. }
  141. log.Println("滚动搜索失败:", err, res)
  142. break // 处理错误时退出循环
  143. }
  144. }
  145. // 在循环外调用 ClearScroll
  146. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  147. if err != nil {
  148. log.Printf("清理滚动搜索失败:%s", err)
  149. }
  150. fmt.Println("结束~~~~~~~~~~~~~~~")
  151. }
  152. // IsInStringArray 判断数组中是否存在字符串
  153. func IsInStringArray(str string, arr []string) bool {
  154. // 先对字符串数组进行排序
  155. sort.Strings(arr)
  156. // 使用二分查找算法查找字符串
  157. pos := sort.SearchStrings(arr, str)
  158. // 如果找到了则返回 true,否则返回 false
  159. return pos < len(arr) && arr[pos] == str
  160. }