main.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "time"
  12. )
  13. type PortraitData struct {
  14. Buyer string `json:"buyer"`
  15. Area string `json:"area"`
  16. City string `json:"city"`
  17. Class string `json:"class"`
  18. BusinessType string `json:"business_type"`
  19. Lasttime int64 `json:"lasttime"`
  20. ProjectCount int64 `json:"project_count"`
  21. ProjectMoney float64 `json:"project_money"`
  22. }
  23. func main() {
  24. dealAllData()
  25. }
  26. // dealIncData 处理增量数据
  27. func dealIncData() {
  28. now := time.Now()
  29. yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
  30. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  31. q := map[string]interface{}{
  32. "pici": map[string]interface{}{
  33. "$gt": yesterday.Unix(),
  34. "$lte": today.Unix(),
  35. },
  36. }
  37. log.Println(q)
  38. }
  39. // dealAllData 处理存量数据,
  40. func dealAllData() {
  41. /**
  42. 循环采购单位存量数据,
  43. */
  44. url := "http://172.17.4.184:19908"
  45. //url := "http://127.0.0.1:19908"
  46. username := "jybid"
  47. password := "Top2023_JEB01i@31"
  48. index := "buyer" //索引名称
  49. //index := "projectset" //索引名称
  50. // 创建 Elasticsearch 客户端
  51. client, err := elastic.NewClient(
  52. elastic.SetURL(url),
  53. elastic.SetBasicAuth(username, password),
  54. elastic.SetSniff(false),
  55. )
  56. if err != nil {
  57. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  58. }
  59. MgoB := &mongodb.MongodbSim{
  60. MongodbAddr: "172.17.189.140:27080",
  61. //MongodbAddr: "127.0.0.1:27083",
  62. Size: 10,
  63. DbName: "qfw",
  64. UserName: "SJZY_RWbid_ES",
  65. Password: "SJZY@B4i4D5e6S",
  66. //Direct: true,
  67. }
  68. MgoB.InitPool()
  69. //query := elastic.NewBoolQuery().
  70. // Must(elastic.NewTermQuery("toptype", "结果")).
  71. // Must(elastic.NewTermQuery("subtype", "招标"))
  72. ctx := context.Background()
  73. //开始滚动搜索
  74. scrollID := ""
  75. scroll := "10m"
  76. searchSource := elastic.NewSearchSource().
  77. //Query(query).
  78. Size(10000).
  79. Sort("_doc", true) //升序排序
  80. //Sort("_doc", false) //降序排序
  81. searchService := client.Scroll(index).
  82. Size(10000).
  83. Scroll(scroll).
  84. SearchSource(searchSource)
  85. res, err := searchService.Do(ctx)
  86. if err != nil {
  87. if err == io.EOF {
  88. fmt.Println("没有数据")
  89. } else {
  90. panic(err)
  91. }
  92. }
  93. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  94. fmt.Println("总数是:", res.TotalHits())
  95. total := 0
  96. for len(res.Hits.Hits) > 0 {
  97. for k, hit := range res.Hits.Hits {
  98. if k%1000 == 0 {
  99. log.Println("当前:", k)
  100. }
  101. var doc map[string]interface{}
  102. err := json.Unmarshal(hit.Source, &doc)
  103. if err != nil {
  104. log.Printf("解析文档失败:%s", err)
  105. continue
  106. }
  107. //处理查询结果
  108. portrait := PortraitData{
  109. Buyer: util.ObjToString(doc["name"]),
  110. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  111. Class: "情报_物业",
  112. }
  113. // 构建查询
  114. query := elastic.NewBoolQuery().
  115. Must(
  116. //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"),
  117. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  118. elastic.NewTermQuery("tag_topinformation", "情报_物业"),
  119. )
  120. // 创建搜索服务
  121. searchService2 := client.Search().
  122. Index("projectset"). // 替换为你的索引名称
  123. Query(query).
  124. Sort("lasttime", false). // false表示降序
  125. Size(1).
  126. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  127. // 执行查询
  128. searchResult, err := searchService2.Do(context.Background())
  129. if err != nil {
  130. log.Fatalf("Error getting response: %s", err)
  131. }
  132. // 处理结果
  133. if searchResult.Hits.TotalHits.Value > 0 {
  134. portrait.ProjectCount = searchResult.TotalHits()
  135. for _, hit := range searchResult.Hits.Hits {
  136. var doc2 map[string]interface{}
  137. err := json.Unmarshal(hit.Source, &doc2)
  138. if err != nil {
  139. log.Printf("解析文档失败:%s", err)
  140. continue
  141. }
  142. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  143. portrait.Area = util.ObjToString(doc2["area"])
  144. portrait.City = util.ObjToString(doc2["city"])
  145. }
  146. // 处理聚合结果
  147. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  148. portrait.ProjectMoney = *agg.Value
  149. } else {
  150. log.Println("Aggregation not found")
  151. }
  152. //写入MongoDB
  153. MgoB.Save("wcc_project_portrait", structToMap(portrait))
  154. } else {
  155. continue
  156. }
  157. }
  158. total = total + len(res.Hits.Hits)
  159. scrollID = res.ScrollId
  160. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  161. log.Println("current count:", total)
  162. if err != nil {
  163. if err == io.EOF {
  164. // 滚动到最后一批数据,退出循环
  165. break
  166. }
  167. log.Println("滚动搜索失败:", err, res)
  168. break // 处理错误时退出循环
  169. }
  170. }
  171. // 在循环外调用 ClearScroll
  172. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  173. if err != nil {
  174. log.Printf("清理滚动搜索失败:%s", err)
  175. }
  176. fmt.Println("结束~~~~~~~~~~~~~~~")
  177. }