main.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. es "jygit.jydev.jianyu360.cn/data_processing/common_utils/elastic"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "log"
  12. "time"
  13. )
  14. var (
  15. saveEsPool = make(chan map[string]interface{}, 5000) //保存binding数据到es
  16. saveEsSp = make(chan bool, 5)
  17. EsBulkSize = 50
  18. Es *es.Elastic
  19. portraitIndex = "project_portrait" // 画像索引
  20. portraitMgo = "wcc_project_portrait" // MongoDB 的表名
  21. )
  22. type PortraitData struct {
  23. Buyer string `json:"buyer"`
  24. Area string `json:"area"`
  25. City string `json:"city"`
  26. Class string `json:"class"`
  27. BusinessType string `json:"business_type"`
  28. Lasttime int64 `json:"lasttime"`
  29. ProjectCount int64 `json:"project_count"`
  30. ProjectMoney float64 `json:"project_money"`
  31. }
  32. func Init() {
  33. //Es = &es.Elastic{
  34. // //S_esurl: "http://127.0.0.1:19908",
  35. // S_esurl: "http://172.17.4.184:19908",
  36. // I_size: 5,
  37. // Username: "jybid",
  38. // Password: "Top2023_JEB01i@31",
  39. //}
  40. //Es.InitElasticSize()
  41. //测试环境
  42. Es = &es.Elastic{
  43. S_esurl: "http://192.168.3.149:9201",
  44. I_size: 5,
  45. Username: "",
  46. Password: "",
  47. }
  48. Es.InitElasticSize()
  49. }
  50. func main() {
  51. Init()
  52. go SaveEsMethod()
  53. //dealAllData()
  54. allDataEs() //处理存量数据到es
  55. ch := make(chan bool, 1)
  56. <-ch
  57. }
  58. // dealIncData 处理增量数据
  59. func dealIncData() {
  60. now := time.Now()
  61. yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
  62. today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
  63. q := map[string]interface{}{
  64. "pici": map[string]interface{}{
  65. "$gt": yesterday.Unix(),
  66. "$lte": today.Unix(),
  67. },
  68. }
  69. log.Println(q)
  70. }
  71. // dealAllData 处理存量数据,
  72. func dealAllData() {
  73. /**
  74. 循环采购单位存量数据,
  75. */
  76. url := "http://172.17.4.184:19908"
  77. //url := "http://127.0.0.1:19908"
  78. username := "jybid"
  79. password := "Top2023_JEB01i@31"
  80. index := "buyer" //索引名称
  81. //index := "projectset" //索引名称
  82. // 创建 Elasticsearch 客户端
  83. client, err := elastic.NewClient(
  84. elastic.SetURL(url),
  85. elastic.SetBasicAuth(username, password),
  86. elastic.SetSniff(false),
  87. )
  88. if err != nil {
  89. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  90. }
  91. MgoB := &mongodb.MongodbSim{
  92. MongodbAddr: "172.17.189.140:27080",
  93. //MongodbAddr: "127.0.0.1:27083",
  94. Size: 10,
  95. DbName: "qfw",
  96. UserName: "SJZY_RWbid_ES",
  97. Password: "SJZY@B4i4D5e6S",
  98. //Direct: true,
  99. }
  100. MgoB.InitPool()
  101. //query := elastic.NewBoolQuery().
  102. // Must(elastic.NewTermQuery("toptype", "结果")).
  103. // Must(elastic.NewTermQuery("subtype", "招标"))
  104. ctx := context.Background()
  105. //开始滚动搜索
  106. scrollID := ""
  107. scroll := "10m"
  108. searchSource := elastic.NewSearchSource().
  109. //Query(query).
  110. Size(10000).
  111. Sort("_doc", true) //升序排序
  112. //Sort("_doc", false) //降序排序
  113. searchService := client.Scroll(index).
  114. Size(10000).
  115. Scroll(scroll).
  116. SearchSource(searchSource)
  117. res, err := searchService.Do(ctx)
  118. if err != nil {
  119. if err == io.EOF {
  120. fmt.Println("没有数据")
  121. } else {
  122. panic(err)
  123. }
  124. }
  125. //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  126. fmt.Println("总数是:", res.TotalHits())
  127. total := 0
  128. for len(res.Hits.Hits) > 0 {
  129. for k, hit := range res.Hits.Hits {
  130. if k%1000 == 0 {
  131. log.Println("当前:", k)
  132. }
  133. var doc map[string]interface{}
  134. err := json.Unmarshal(hit.Source, &doc)
  135. if err != nil {
  136. log.Printf("解析文档失败:%s", err)
  137. continue
  138. }
  139. //处理查询结果
  140. portrait := PortraitData{
  141. Buyer: util.ObjToString(doc["name"]),
  142. BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
  143. Class: "情报_物业",
  144. }
  145. // 构建查询
  146. query := elastic.NewBoolQuery().
  147. Must(
  148. //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"),
  149. elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
  150. elastic.NewTermQuery("tag_topinformation", "情报_物业"),
  151. )
  152. // 创建搜索服务
  153. searchService2 := client.Search().
  154. Index("projectset"). // 替换为你的索引名称
  155. Query(query).
  156. Sort("lasttime", false). // false表示降序
  157. Size(1).
  158. Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
  159. // 执行查询
  160. searchResult, err := searchService2.Do(context.Background())
  161. if err != nil {
  162. log.Fatalf("Error getting response: %s", err)
  163. }
  164. // 处理结果
  165. if searchResult.Hits.TotalHits.Value > 0 {
  166. portrait.ProjectCount = searchResult.TotalHits()
  167. for _, hit := range searchResult.Hits.Hits {
  168. var doc2 map[string]interface{}
  169. err := json.Unmarshal(hit.Source, &doc2)
  170. if err != nil {
  171. log.Printf("解析文档失败:%s", err)
  172. continue
  173. }
  174. portrait.Lasttime = util.Int64All(doc2["lasttime"])
  175. portrait.Area = util.ObjToString(doc2["area"])
  176. portrait.City = util.ObjToString(doc2["city"])
  177. }
  178. // 处理聚合结果
  179. if agg, found := searchResult.Aggregations.Sum("total_price"); found {
  180. portrait.ProjectMoney = *agg.Value
  181. } else {
  182. log.Println("Aggregation not found")
  183. }
  184. //写入MongoDB
  185. MgoB.Save("wcc_project_portrait", structToMap(portrait))
  186. } else {
  187. continue
  188. }
  189. }
  190. total = total + len(res.Hits.Hits)
  191. scrollID = res.ScrollId
  192. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  193. log.Println("current count:", total)
  194. if err != nil {
  195. if err == io.EOF {
  196. // 滚动到最后一批数据,退出循环
  197. break
  198. }
  199. log.Println("滚动搜索失败:", err, res)
  200. break // 处理错误时退出循环
  201. }
  202. }
  203. // 在循环外调用 ClearScroll
  204. _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
  205. if err != nil {
  206. log.Printf("清理滚动搜索失败:%s", err)
  207. }
  208. fmt.Println("结束~~~~~~~~~~~~~~~")
  209. }
  210. // allDataEs 处理存量数据到es
  211. func allDataEs() {
  212. //MgoB := &mongodb.MongodbSim{
  213. // MongodbAddr: "172.17.189.140:27080",
  214. // //MongodbAddr: "127.0.0.1:27083",
  215. // Size: 10,
  216. // DbName: "qfw",
  217. // UserName: "SJZY_RWbid_ES",
  218. // Password: "SJZY@B4i4D5e6S",
  219. // //Direct: true,
  220. //}
  221. //MgoB.InitPool()
  222. //
  223. MgoB := &mongodb.MongodbSim{
  224. MongodbAddr: "192.168.3.206:27002",
  225. Size: 10,
  226. DbName: "qfw_data",
  227. UserName: "root",
  228. Password: "root",
  229. //Direct: true,
  230. }
  231. MgoB.InitPool()
  232. defer util.Catch()
  233. sess := MgoB.GetMgoConn()
  234. defer MgoB.DestoryMongoConn(sess)
  235. count := 0
  236. it := sess.DB("qfw_data").C(portraitMgo).Find(nil).Select(nil).Iter()
  237. for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
  238. if count%5000 == 0 {
  239. log.Println("current:", count, tmp["_id"])
  240. }
  241. id := mongodb.BsonIdToSId(tmp["_id"])
  242. tmp["id"] = id
  243. tmp["_id"] = id
  244. saveEsPool <- tmp
  245. //Es.Save("project_portrait", tmp)
  246. }
  247. log.Println("数据处理完毕")
  248. }
  249. func SaveEsMethod() {
  250. arru := make([]map[string]interface{}, EsBulkSize)
  251. indexu := 0
  252. for {
  253. select {
  254. case v := <-saveEsPool:
  255. arru[indexu] = v
  256. indexu++
  257. if indexu == EsBulkSize {
  258. saveEsSp <- true
  259. go func(arru []map[string]interface{}) {
  260. defer func() {
  261. <-saveEsSp
  262. }()
  263. Es.BulkSave(portraitIndex, arru)
  264. }(arru)
  265. arru = make([]map[string]interface{}, EsBulkSize)
  266. indexu = 0
  267. }
  268. case <-time.After(1000 * time.Millisecond):
  269. if indexu > 0 {
  270. saveEsSp <- true
  271. go func(arru []map[string]interface{}) {
  272. defer func() {
  273. <-saveEsSp
  274. }()
  275. Es.BulkSave(portraitIndex, arru)
  276. }(arru[:indexu])
  277. arru = make([]map[string]interface{}, EsBulkSize)
  278. indexu = 0
  279. }
  280. }
  281. }
  282. }