package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "strings" ) func main() { //getProjectDataFromEs() //整理数据,然后才能写Clickhouse click() log.Println("数据处理完毕") } // getProjectDataFromEs 获取项目 中标成交数据 func getProjectDataFromEs() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "projectset" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //85 抽取库 Mgo := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "172.17.4.85:27080", DbName: "top", Size: 10, //Direct: true, } Mgo.InitPool() //MgoB := &mongodb.MongodbSim{ // MongodbAddr: "172.17.189.140:27080", // //MongodbAddr: "127.0.0.1:27083", // Size: 10, // DbName: "qfw", // UserName: "SJZY_RWbid_ES", // Password: "SJZY@B4i4D5e6S", // //Direct: true, //} //MgoB.InitPool() //2023年01-01 2023-10-01,,1-3季度 //2024-1 - 2024-4;1704038400-1711900800 //2023-10-1 2024-1-1;1696089600-1704038400 //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市") //rangeQuery := elastic.NewRangeQuery("jgtime").Gte(1711900800).Lt(1719763200) //2024年4-7月 //rangeQuery := elastic.NewRangeQuery("jgtime").Gte(1735660800).Lt(1743436800) //2025年1-3月 rangeQuery := elastic.NewRangeQuery("jgtime").Gte(1743436800).Lt(1751299200) //2025年4.1-7.1 ;25年第二季度数据 query := elastic.NewBoolQuery(). //Must(areaTermsQuery). Must(elastic.NewTermsQuery("bidstatus", "中标", "单一", "成交", "合同")). Must(rangeQuery) //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000") //query := elastic.NewBoolQuery(). // //北京,天津,河北,上海,江苏,浙江,安徽 // //Must(elastic.NewTermQuery("area", "北京市")). // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } delete(doc, "filetext") delete(doc, "detail") sWinner := util.ObjToString(doc["s_winner"]) winners := strings.Split(sWinner, ",") for _, v := range winners { insert := doc insert["s_winner"] = v //存入新表 err = Mgo.InsertOrUpdate("top", "wcc_allcity_2025Q2", insert) if err != nil { log.Println("error", doc["id"]) } } } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") }