forecast.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "sync"
  12. )
  13. // getForecast 获取测试环境数据
  14. func getForecast() {
  15. url := "http://192.168.3.149:9201"
  16. //url := "http://127.0.0.1:19805"
  17. username := ""
  18. password := ""
  19. index := "forecast" //索引名称
  20. // 创建 Elasticsearch 客户端
  21. client, err := elastic.NewClient(
  22. elastic.SetURL(url),
  23. elastic.SetBasicAuth(username, password),
  24. elastic.SetSniff(false),
  25. )
  26. if err != nil {
  27. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  28. }
  29. //测试环境
  30. Mgo := &mongodb.MongodbSim{
  31. //MongodbAddr: "127.0.0.1:27080",
  32. MongodbAddr: "192.168.3.206:27002",
  33. DbName: "qfw_data",
  34. Size: 10,
  35. UserName: "root",
  36. Password: "root",
  37. //Direct: true,
  38. }
  39. Mgo.InitPool()
  40. MgoB := &mongodb.MongodbSim{
  41. //MongodbAddr: "172.17.189.140:27080",
  42. MongodbAddr: "127.0.0.1:27083",
  43. Size: 10,
  44. DbName: "qfw",
  45. UserName: "SJZY_RWbid_ES",
  46. Password: "SJZY@B4i4D5e6S",
  47. Direct: true,
  48. }
  49. MgoB.InitPool()
  50. //2023年01-01 2023-10-01,,1-3季度
  51. //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省")
  52. //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400)
  53. //query := elastic.NewBoolQuery().
  54. // Must(areaTermsQuery).
  55. // Must(rangeQuery)
  56. ctx := context.Background()
  57. //开始滚动搜索
  58. scrollID := ""
  59. scroll := "10m"
  60. searchSource := elastic.NewSearchSource().
  61. Query(nil).
  62. Size(10000).
  63. Sort("_doc", true) //升序排序
  64. //Sort("_doc", false) //降序排序
  65. searchService := client.Scroll(index).
  66. Size(10000).
  67. Scroll(scroll).
  68. SearchSource(searchSource)
  69. res, err := searchService.Do(ctx)
  70. if err != nil {
  71. if err == io.EOF {
  72. fmt.Println("没有数据")
  73. } else {
  74. panic(err)
  75. }
  76. }
  77. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  78. fmt.Println("总数是:", res.TotalHits())
  79. total := 0
  80. for len(res.Hits.Hits) > 0 {
  81. for _, hit := range res.Hits.Hits {
  82. var doc map[string]interface{}
  83. err := json.Unmarshal(hit.Source, &doc)
  84. if err != nil {
  85. log.Printf("解析文档失败:%s", err)
  86. continue
  87. }
  88. //infoid := util.ObjToString(doc["infoid"])
  89. //
  90. //bidding, _ := MgoB.FindById("bidding", infoid, nil)
  91. ////存入新表
  92. //rs := Mgo.SaveByOriID("bidding", bidding)
  93. //if !rs {
  94. // log.Println("保存出错", infoid)
  95. //}
  96. Mgo.Save("wcc_forecast", doc)
  97. }
  98. total = total + len(res.Hits.Hits)
  99. scrollID = res.ScrollId
  100. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  101. log.Println("current count:", total)
  102. if err != nil {
  103. if err == io.EOF {
  104. // 滚动到最后一批数据,退出循环
  105. break
  106. }
  107. log.Println("滚动搜索失败:", err, res)
  108. break // 处理错误时退出循环
  109. }
  110. }
  111. // 在循环外调用 ClearScroll
  112. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  113. if err != nil {
  114. log.Printf("清理滚动搜索失败:%s", err)
  115. }
  116. fmt.Println("结束~~~~~~~~~~~~~~~")
  117. }
  118. func dealData() {
  119. Mgo := &mongodb.MongodbSim{
  120. //MongodbAddr: "127.0.0.1:27080",
  121. MongodbAddr: "192.168.3.206:27002",
  122. DbName: "qfw_data",
  123. Size: 10,
  124. UserName: "root",
  125. Password: "root",
  126. //Direct: true,
  127. }
  128. Mgo.InitPool()
  129. // 163 正式环境
  130. MgoB := &mongodb.MongodbSim{
  131. //MongodbAddr: "172.17.189.140:27080",
  132. MongodbAddr: "127.0.0.1:27083",
  133. Size: 10,
  134. DbName: "qfw",
  135. UserName: "SJZY_RWbid_ES",
  136. Password: "SJZY@B4i4D5e6S",
  137. Direct: true,
  138. }
  139. MgoB.InitPool()
  140. sess := Mgo.GetMgoConn()
  141. defer Mgo.DestoryMongoConn(sess)
  142. query := sess.DB("qfw_data").C("wcc_forecast").Find(nil).Select(map[string]interface{}{"infoid": 1}).Iter()
  143. count := 0
  144. ch := make(chan bool, 10)
  145. wg := &sync.WaitGroup{}
  146. for tmp := make(map[string]interface{}); query.Next(tmp); count++ {
  147. if count%1000 == 0 {
  148. log.Println("current:", count)
  149. }
  150. ch <- true
  151. wg.Add(1)
  152. go func(tmp map[string]interface{}) {
  153. defer func() {
  154. <-ch
  155. wg.Done()
  156. }()
  157. infoid := util.ObjToString(tmp["infoid"])
  158. bidding, _ := MgoB.FindById("bidding", infoid, nil)
  159. //存入新表
  160. rs := Mgo.SaveByOriID("bidding", bidding)
  161. if !rs {
  162. log.Println("保存出错", infoid)
  163. }
  164. }(tmp)
  165. tmp = make(map[string]interface{})
  166. }
  167. wg.Wait()
  168. log.Println("over")
  169. }