analyzeTask.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. package internal
  2. import (
  3. "analyze/internal/consts"
  4. "analyze/internal/model"
  5. "analyze/internal/model/do"
  6. "analyze/internal/model/entity"
  7. "analyze/utility"
  8. "encoding/json"
  9. "fmt"
  10. "github.com/gogf/gf/v2/frame/g"
  11. "github.com/gogf/gf/v2/os/gctx"
  12. es "github.com/olivere/elastic/v7"
  13. "strings"
  14. )
  15. type AnalyzeTask struct {
  16. StartTime int64 //数据查询开始时间
  17. EndTime int64 //数据查询结束时间
  18. UI []*UserTask //当前时间段内需要处理的用户数据
  19. }
  20. // Separation 时间分割
  21. func (a *AnalyzeTask) Separation() (at []*AnalyzeTask) {
  22. groups := make(map[string]*AnalyzeTask)
  23. for _, ui := range a.UI {
  24. found := false
  25. for _, task := range groups {
  26. if ui.FormatParam.STime <= task.EndTime && ui.FormatParam.ETime >= task.StartTime {
  27. task.UI = append(task.UI, ui)
  28. task.StartTime = utility.Min(task.StartTime, ui.FormatParam.STime)
  29. task.EndTime = utility.Max(task.EndTime, ui.FormatParam.ETime)
  30. found = true
  31. break
  32. }
  33. }
  34. if !found {
  35. groups[fmt.Sprintf("%d-%d", ui.FormatParam.STime, ui.FormatParam.ETime)] = &AnalyzeTask{
  36. StartTime: ui.FormatParam.STime,
  37. EndTime: ui.FormatParam.ETime,
  38. UI: []*UserTask{ui},
  39. }
  40. }
  41. }
  42. if len(groups) > 0 {
  43. for _, gp := range groups {
  44. at = append(at, gp)
  45. }
  46. }
  47. return
  48. }
  49. // GetDataByEs 数据查询
  50. func (a *AnalyzeTask) Run() {
  51. indexName := g.Cfg().MustGet(gctx.New(), "es.indexName").String()
  52. limit := g.Cfg().MustGet(gctx.New(), "es.countLimit").Int()
  53. iterationTimes := 0
  54. iterationSize := 200
  55. query := es.NewBoolQuery().Must(
  56. es.NewRangeQuery("jgtime").Gte(a.StartTime).Lte(a.EndTime),
  57. es.NewTermsQuery("bidstatus", consts.BidStatus...),
  58. )
  59. // 打印查询语句
  60. //source, err := query.Source()
  61. //if err != nil {
  62. // g.Log().Info(model.Ctx, "---1---", err)
  63. //}
  64. //queryJSON, err := json.MarshalIndent(source, "", " ")
  65. //if err != nil {
  66. // g.Log().Info(model.Ctx, "---2---", err)
  67. //}
  68. //g.Log().Info(model.Ctx, "---3---", string(queryJSON))
  69. client := do.Es.GetEsConn()
  70. defer do.Es.DestoryEsConn(client)
  71. // 创建一个搜索服务对象,并设置查询条件和排序
  72. // 执行搜索请求,并获取迭代器 上下文超时时间1分钟
  73. scrollService := client.Scroll(indexName).
  74. Query(query).
  75. Sort("jgtime", false).
  76. Size(iterationSize).
  77. KeepAlive("2m")
  78. results, err := scrollService.Do(model.Ctx)
  79. if err == nil {
  80. scrollId := results.ScrollId
  81. //迭代处理每个结果
  82. for {
  83. //超过限定数量 跳出循环
  84. if iterationTimes*iterationSize >= limit {
  85. break
  86. }
  87. //从迭代器获取下页数据
  88. if iterationTimes > 0 {
  89. results, err = scrollService.ScrollId(scrollId).Do(model.Ctx)
  90. if err != nil {
  91. g.Log().Info(model.Ctx, "Error executing scroll: %s", err)
  92. if err.Error() == "EOF" {
  93. break
  94. }
  95. }
  96. scrollId = results.ScrollId
  97. }
  98. if len(results.Hits.Hits) == 0 {
  99. //没有更多结果
  100. break
  101. }
  102. iterationTimes++
  103. for _, hit := range results.Hits.Hits {
  104. p := &entity.ProjectInfo{}
  105. if err := json.Unmarshal(hit.Source, p); err == nil {
  106. for _, ui := range a.UI {
  107. if ui.FormatParam.STime <= p.JgTime && ui.FormatParam.ETime >= p.JgTime {
  108. ui.Start()
  109. ui.Push(p)
  110. }
  111. }
  112. } else {
  113. g.Log().Info(model.Ctx, "project info json err :", err.Error())
  114. }
  115. }
  116. }
  117. } else {
  118. g.Log().Info(model.Ctx, "elastic search err: ", err)
  119. }
  120. return
  121. }
  122. // query
  123. func (a *AnalyzeTask) GetEsSql() string {
  124. query := fmt.Sprintf(`{"query":{"bool":{"must":[{"range":{"jgtime":{"gte":%d,"lte":%d}}},{"terms":{"bidstatus":["中标","成交","合同","单一"]}}]}},"sort": [{"jgtime": "desc"}],"from": 0,"size": %d}`, a.StartTime, a.EndTime, g.Cfg().MustGet(gctx.New(), "es.countLimit").Int())
  125. return query
  126. }
  127. // 企业信息
  128. func GetEntNameByIds(ids []string) (returnMap map[string]string) {
  129. returnMap = map[string]string{}
  130. if len(ids) == 0 {
  131. return
  132. }
  133. list := do.Es.Get("qyxy", "qyxy", fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"_id":["%s"]}}]}},"_source":["_id","company_name"],"size":%d}`, strings.Join(ids, `","`), len(ids)))
  134. if list == nil || len(*list) == 0 {
  135. return
  136. }
  137. for _, item := range *list {
  138. id, _ := item["_id"].(string)
  139. name, _ := item["company_name"].(string)
  140. if id != "" && name != "" {
  141. returnMap[id] = name
  142. }
  143. }
  144. return
  145. }