package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "time" ) type PortraitData struct { Buyer string `json:"buyer"` Area string `json:"area"` City string `json:"city"` Class string `json:"class"` BusinessType string `json:"business_type"` Lasttime int64 `json:"lasttime"` ProjectCount int64 `json:"project_count"` ProjectMoney float64 `json:"project_money"` } func main() { dealAllData() } // dealIncData 处理增量数据 func dealIncData() { now := time.Now() yesterday := time.Date(now.Year(), now.Month(), now.Day()-1, 0, 0, 0, 0, now.Location()) today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) q := map[string]interface{}{ "pici": map[string]interface{}{ "$gt": yesterday.Unix(), "$lte": today.Unix(), }, } log.Println(q) } // dealAllData 处理存量数据, func dealAllData() { /** 循环采购单位存量数据, */ url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "buyer" //索引名称 //index := "projectset" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() //query := elastic.NewBoolQuery(). // Must(elastic.NewTermQuery("toptype", "结果")). // Must(elastic.NewTermQuery("subtype", "招标")) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). //Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for k, hit := range res.Hits.Hits { if k%1000 == 0 { log.Println("当前:", k) } var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } //处理查询结果 portrait := PortraitData{ Buyer: util.ObjToString(doc["name"]), BusinessType: getStr(util.ObjToString(doc["buyerclass"])), Class: "情报_物业", } // 构建查询 query := elastic.NewBoolQuery(). Must( //elastic.NewTermQuery("buyer", "泸州市龙马潭区人民医院"), elastic.NewTermQuery("buyer", util.ObjToString(doc["name"])), elastic.NewTermQuery("tag_topinformation", "情报_物业"), ) // 创建搜索服务 searchService2 := client.Search(). Index("projectset"). // 替换为你的索引名称 Query(query). Sort("lasttime", false). // false表示降序 Size(1). Aggregation("total_price", elastic.NewSumAggregation().Field("sortprice")) // 执行查询 searchResult, err := searchService2.Do(context.Background()) if err != nil { log.Fatalf("Error getting response: %s", err) } // 处理结果 if searchResult.Hits.TotalHits.Value > 0 { portrait.ProjectCount = searchResult.TotalHits() for _, hit := range searchResult.Hits.Hits { var doc2 map[string]interface{} err := json.Unmarshal(hit.Source, &doc2) if err != nil { log.Printf("解析文档失败:%s", err) continue } portrait.Lasttime = util.Int64All(doc2["lasttime"]) portrait.Area = util.ObjToString(doc2["area"]) portrait.City = util.ObjToString(doc2["city"]) } // 处理聚合结果 if agg, found := searchResult.Aggregations.Sum("total_price"); found { portrait.ProjectMoney = *agg.Value } else { log.Println("Aggregation not found") } //写入MongoDB MgoB.Save("wcc_project_portrait", structToMap(portrait)) } else { continue } } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") }