123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/olivere/elastic/v7"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "time"
- )
- type PortraitData struct {
- Buyer string `json:"buyer"`
- Area string `json:"area"`
- City string `json:"city"`
- Class string `json:"class"`
- BusinessType string `json:"business_type"`
- Lasttime int64 `json:"lasttime"`
- ProjectCount int64 `json:"project_count"`
- ProjectMoney float64 `json:"project_money"`
- }
- func main() {
- dealAllData()
- }
- // dealIncData 处理增量数据
- func dealIncData() {
- now := time.Now()
- yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location())
- today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
- q := map[string]interface{}{
- "pici": map[string]interface{}{
- "$gt": yesterday.Unix(),
- "$lte": today.Unix(),
- },
- }
- log.Println(q)
- }
- // dealAllData 处理存量数据,
- func dealAllData() {
- /**
- 循环采购单位存量数据,
- */
- url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- index := "buyer" //索引名称
- //index := "projectset" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- MgoB := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "qfw",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoB.InitPool()
- //query := elastic.NewBoolQuery().
- // Must(elastic.NewTermQuery("toptype", "结果")).
- // Must(elastic.NewTermQuery("subtype", "招标"))
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "10m"
- searchSource := elastic.NewSearchSource().
- //Query(query).
- Size(10000).
- Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll(index).
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- total := 0
- for len(res.Hits.Hits) > 0 {
- for k, hit := range res.Hits.Hits {
- if k%1000 == 0 {
- log.Println("当前:", k)
- }
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- //处理查询结果
- portrait := PortraitData{
- Buyer: util.ObjToString(doc["name"]),
- BusinessType: getStr(util.ObjToString(doc["buyerclass"])),
- Class: "情报_物业",
- }
- // 构建查询
- query := elastic.NewBoolQuery().
- Must(
- //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"),
- elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])),
- elastic.NewTermQuery("tag_topinformation", "情报_物业"),
- )
- // 创建搜索服务
- searchService2 := client.Search().
- Index("projectset"). // 替换为你的索引名称
- Query(query).
- Sort("lasttime", false). // false表示降序
- Size(1).
- Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice"))
- // 执行查询
- searchResult, err := searchService2.Do(context.Background())
- if err != nil {
- log.Fatalf("Error getting response: %s", err)
- }
- // 处理结果
- if searchResult.Hits.TotalHits.Value > 0 {
- portrait.ProjectCount = searchResult.TotalHits()
- for _, hit := range searchResult.Hits.Hits {
- var doc2 map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc2)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- portrait.Lasttime = util.Int64All(doc2["lasttime"])
- portrait.Area = util.ObjToString(doc2["area"])
- portrait.City = util.ObjToString(doc2["city"])
- }
- // 处理聚合结果
- if agg, found := searchResult.Aggregations.Sum("total_price"); found {
- portrait.ProjectMoney = *agg.Value
- } else {
- log.Println("Aggregation not found")
- }
- //写入MongoDB
- MgoB.Save("wcc_project_portrait", structToMap(portrait))
- } else {
- continue
- }
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- log.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Println("滚动搜索失败:", err, res)
- break // 处理错误时退出循环
- }
- }
- // 在循环外调用 ClearScroll
- _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
- if err != nil {
- log.Printf("清理滚动搜索失败:%s", err)
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- }
|