123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- package internal
- import (
- "analyze/internal/consts"
- "analyze/internal/model"
- "analyze/internal/model/do"
- "analyze/internal/model/entity"
- "analyze/utility"
- "encoding/json"
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gctx"
- es "github.com/olivere/elastic/v7"
- "strings"
- )
- type AnalyzeTask struct {
- StartTime int64 //数据查询开始时间
- EndTime int64 //数据查询结束时间
- UI []*UserTask //当前时间段内需要处理的用户数据
- }
- // Separation 时间分割
- func (a *AnalyzeTask) Separation() (at []*AnalyzeTask) {
- groups := make(map[string]*AnalyzeTask)
- for _, ui := range a.UI {
- found := false
- for _, task := range groups {
- if ui.FormatParam.STime <= task.EndTime && ui.FormatParam.ETime >= task.StartTime {
- task.UI = append(task.UI, ui)
- task.StartTime = utility.Min(task.StartTime, ui.FormatParam.STime)
- task.EndTime = utility.Max(task.EndTime, ui.FormatParam.ETime)
- found = true
- break
- }
- }
- if !found {
- groups[fmt.Sprintf("%d-%d", ui.FormatParam.STime, ui.FormatParam.ETime)] = &AnalyzeTask{
- StartTime: ui.FormatParam.STime,
- EndTime: ui.FormatParam.ETime,
- UI: []*UserTask{ui},
- }
- }
- }
- if len(groups) > 0 {
- for _, gp := range groups {
- at = append(at, gp)
- }
- }
- return
- }
- // GetDataByEs 数据查询
- func (a *AnalyzeTask) Run() {
- indexName := g.Cfg().MustGet(gctx.New(), "es.indexName").String()
- limit := g.Cfg().MustGet(gctx.New(), "es.countLimit").Int()
- iterationTimes := 0
- iterationSize := 200
- query := es.NewBoolQuery().Must(
- es.NewRangeQuery("jgtime").Gte(a.StartTime).Lte(a.EndTime),
- es.NewTermsQuery("bidstatus", consts.BidStatus...),
- )
- // 打印查询语句
- //source, err := query.Source()
- //if err != nil {
- // g.Log().Info(model.Ctx, "---1---", err)
- //}
- //queryJSON, err := json.MarshalIndent(source, "", " ")
- //if err != nil {
- // g.Log().Info(model.Ctx, "---2---", err)
- //}
- //g.Log().Info(model.Ctx, "---3---", string(queryJSON))
- client := do.Es.GetEsConn()
- defer do.Es.DestoryEsConn(client)
- // 创建一个搜索服务对象,并设置查询条件和排序
- // 执行搜索请求,并获取迭代器 上下文超时时间1分钟
- scrollService := client.Scroll(indexName).
- Query(query).
- Sort("jgtime", false).
- Size(iterationSize).
- KeepAlive("2m")
- results, err := scrollService.Do(model.Ctx)
- if err == nil {
- scrollId := results.ScrollId
- //迭代处理每个结果
- for {
- //超过限定数量 跳出循环
- if iterationTimes*iterationSize >= limit {
- break
- }
- //从迭代器获取下页数据
- if iterationTimes > 0 {
- results, err = scrollService.ScrollId(scrollId).Do(model.Ctx)
- if err != nil {
- g.Log().Info(model.Ctx, "Error executing scroll: %s", err)
- if err.Error() == "EOF" {
- break
- }
- }
- scrollId = results.ScrollId
- }
- if len(results.Hits.Hits) == 0 {
- //没有更多结果
- break
- }
- iterationTimes++
- for _, hit := range results.Hits.Hits {
- p := &entity.ProjectInfo{}
- if err := json.Unmarshal(hit.Source, p); err == nil {
- for _, ui := range a.UI {
- if ui.FormatParam.STime <= p.JgTime && ui.FormatParam.ETime >= p.JgTime {
- ui.Start()
- ui.Push(p)
- }
- }
- } else {
- g.Log().Info(model.Ctx, "project info json err :", err.Error())
- }
- }
- }
- } else {
- g.Log().Info(model.Ctx, "elastic search err: ", err)
- }
- return
- }
- // query
- func (a *AnalyzeTask) GetEsSql() string {
- query := fmt.Sprintf(`{"query":{"bool":{"must":[{"range":{"jgtime":{"gte":%d,"lte":%d}}},{"terms":{"bidstatus":["中标","成交","合同","单一"]}}]}},"sort": [{"jgtime": "desc"}],"from": 0,"size": %d}`, a.StartTime, a.EndTime, g.Cfg().MustGet(gctx.New(), "es.countLimit").Int())
- return query
- }
- // 企业信息
- func GetEntNameByIds(ids []string) (returnMap map[string]string) {
- returnMap = map[string]string{}
- if len(ids) == 0 {
- return
- }
- list := do.Es.Get("qyxy", "qyxy", fmt.Sprintf(`{"query":{"bool":{"must":[{"terms":{"_id":["%s"]}}]}},"_source":["_id","company_name"],"size":%d}`, strings.Join(ids, `","`), len(ids)))
- if list == nil || len(*list) == 0 {
- return
- }
- for _, item := range *list {
- id, _ := item["_id"].(string)
- name, _ := item["company_name"].(string)
- if id != "" && name != "" {
- returnMap[id] = name
- }
- }
- return
- }
|