package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "github.com/xuri/excelize/v2" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "log" "math" "os" "regexp" ) type Company struct { CompanyName string `json:"company_name"` Capital float64 `json:"capital"` CompanyType string `json:"company_type"` CompanyStatus string `json:"company_status"` } var ( indexName = "qyxy" ) func getQyxyNationToFiles() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("Elasticsearch 连接失败: %v", err) } //nationalSubs := []string{"石油和天然气开采业", "文教、工美、体育和娱乐用品制造业", "石油、煤炭及其他燃料加工业", "化学原料和化学制品制造业", // "医药制造业", "通用设备制造业", "专用设备制造业", "汽车制造业", "电信、广播电视和卫星传输服务", "互联网和相关服务", "软件和信息技术服务业", "货币金融服务", // "资本市场服务", "保险业", "其他金融业", "其他金融业", "教育", "新闻和出版业","生态保护和环境治理业"} nationalSubs := []string{"生态保护和环境治理业"} outputDir := "output" // 确保输出目录存在 os.MkdirAll(outputDir, 0755) err = exportSelectedGroupsToFiles(client, "national_sub", nationalSubs, outputDir) if err != nil { log.Fatalf("导出失败: %v", err) } log.Println("🎉 所有分组导出完成") } // getQyxyNation getQyxyNation func getQyxyNation() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("Elasticsearch 连接失败: %v", err) } // 1. 创建 Excel 文件 f := excelize.NewFile() // 2. 定义分组字段 groupFields := []string{"national_sub"} sheetNames := []string{"按二级分类"} for i, field := range groupFields { log.Printf("查询并导出:%s\n", field) err := exportGroupToSheet2(client, f, field, sheetNames[i]) if err != nil { log.Fatalf("导出失败(%s): %v", field, err) } } // 删除默认 Sheet f.DeleteSheet("Sheet1") // 保存文件 if err := f.SaveAs("top_companies_by_industry_levels.xlsx"); err != nil { log.Fatalf("保存失败: %v", err) } log.Println("✅ 所有分组已导出完成: top_companies_by_industry_levels.xlsx") } func exportGroupToSheet(client *elastic.Client, f *excelize.File, groupField, sheetName string) error { ctx := context.Background() // 查询条件 query := elastic.NewBoolQuery(). Must(elastic.NewTermQuery("company_status", "存续")). MustNot(elastic.NewTermQuery("company_type", "个体工商户")) // 字段筛选器 sourceCtx := elastic.NewFetchSourceContext(true).Include( "company_name", "capital", "company_type", "company_status", groupField, ) // 聚合构建 topHitsAgg := elastic.NewTopHitsAggregation(). Size(5). Sort("capital", false). FetchSourceContext(sourceCtx) termsAgg := elastic.NewTermsAggregation().Field(groupField).Size(500). SubAggregation("top_capital_companies", topHitsAgg) // 查询执行 res, err := client.Search(). Index(indexName). Query(query). Size(0). Aggregation("by_group", termsAgg). Do(ctx) if err != nil { return fmt.Errorf("Elasticsearch 查询失败: %v", err) } // 创建工作表 index, _ := f.NewSheet(sheetName) headers := []string{groupField, "company_name", "capital", "company_type", "company_status"} for i, h := range headers { cell, _ := excelize.CoordinatesToCellName(i+1, 1) f.SetCellValue(sheetName, cell, h) } // 解析结果并写入 agg, ok := res.Aggregations.Terms("by_group") if !ok { return fmt.Errorf("未找到分组聚合结果") } row := 2 for _, bucket := range agg.Buckets { topHits, ok := bucket.TopHits("top_capital_companies") if !ok { continue } for _, hit := range topHits.Hits.Hits { var c Company if err := json.Unmarshal(hit.Source, &c); err != nil { continue } data := []interface{}{bucket.Key, c.CompanyName, c.Capital, c.CompanyType, c.CompanyStatus} for col, val := range data { cell, _ := excelize.CoordinatesToCellName(col+1, row) f.SetCellValue(sheetName, cell, val) } row++ } } // 激活第一个 Sheet if sheetName == "按一级分类" { f.SetActiveSheet(index) } return nil } func exportGroupToSheet3(client *elastic.Client, f *excelize.File, groupField, sheetName string) error { ctx := context.Background() // 查询条件 query := elastic.NewBoolQuery(). Must(elastic.NewTermQuery("company_status", "存续")). MustNot(elastic.NewTermQuery("company_type", "个体工商户")) // 第一阶段:获取分组文档总数 countAgg := elastic.NewTermsAggregation().Field(groupField).Size(500) countRes, err := client.Search(). Index(indexName). Query(query). Size(0). Aggregation("by_group", countAgg). Do(ctx) if err != nil { return fmt.Errorf("获取分组总数失败: %v", err) } // 解析分组桶 agg, ok := countRes.Aggregations.Terms("by_group") if !ok { return fmt.Errorf("未找到分组聚合结果") } // 创建工作表 //index, _ := f.NewSheet(sheetName) headers := []string{groupField, "company_name", "capital", "company_type", "company_status"} for i, h := range headers { cell, _ := excelize.CoordinatesToCellName(i+1, 1) f.SetCellValue(sheetName, cell, h) } // 第二阶段:分批查询每个分组的5%数据 //row := 2 for _, bucket := range agg.Buckets { log.Println(bucket) // 计算5%的数据量,至少取1条 //size := int(math.Max(1, float64(bucket.DocCount)*0.05)) } return nil } func exportGroupToSheet21(client *elastic.Client, f *excelize.File, groupField, sheetName string) error { ctx := context.Background() // 查询条件 query := elastic.NewBoolQuery(). Must(elastic.NewTermQuery("company_status", "存续")). MustNot(elastic.NewTermQuery("company_type", "个体工商户")) // 第一步:获取分组及每个分组的文档总数 countAgg := elastic.NewTermsAggregation().Field(groupField).Size(500) countRes, err := client.Search(). Index(indexName). Query(query). Size(0). Aggregation("by_group", countAgg). Do(ctx) if err != nil { return fmt.Errorf("获取分组总数失败: %v", err) } agg, ok := countRes.Aggregations.Terms("by_group") if !ok { return fmt.Errorf("未找到分组聚合结果") } // 创建工作表 index, _ := f.NewSheet(sheetName) headers := []string{groupField, "company_name", "capital", "company_type", "company_status"} for i, h := range headers { cell, _ := excelize.CoordinatesToCellName(i+1, 1) f.SetCellValue(sheetName, cell, h) } row := 2 // 第二步:对每个分组进行查询并提取前 5% 的数据 sourceCtx := elastic.NewFetchSourceContext(true).Include( "company_name", "capital", "company_type", "company_status", groupField, ) for _, bucket := range agg.Buckets { groupVal := bucket.Key docCount := bucket.DocCount size := int(math.Max(1, float64(docCount)*0.05)) groupStr := fmt.Sprintf("%v", groupVal) query := elastic.NewBoolQuery().Filter( elastic.NewTermQuery(groupField, groupStr), ) src, _ := query.Source() b, _ := json.MarshalIndent(src, "", " ") fmt.Println("Query body:\n", string(b)) //sourceCtx := elastic.NewFetchSourceContext(true).Include("company_name", "capital") // 确保不为 nil res, err := client.Search(). Index(indexName). Query(query). Sort("capital", false). Size(size). FetchSourceContext(sourceCtx). Do(ctx) if err != nil { fmt.Printf("查询失败 group=%v, err=%v, type=%T\n", groupVal, err, err) continue } if res == nil { fmt.Println("查询结果为空(res=nil)") continue } fmt.Printf("查询成功: 命中 %d 条\n", res.Hits.TotalHits.Value) subAgg, ok := res.Aggregations.Terms("by_group") if !ok || len(subAgg.Buckets) == 0 { continue } topHits, ok := subAgg.Buckets[0].TopHits("top_capital_companies") if !ok { continue } for _, hit := range topHits.Hits.Hits { var c Company if err := json.Unmarshal(hit.Source, &c); err != nil { continue } data := []interface{}{groupVal, c.CompanyName, c.Capital, c.CompanyType, c.CompanyStatus} for col, val := range data { cell, _ := excelize.CoordinatesToCellName(col+1, row) f.SetCellValue(sheetName, cell, val) } row++ } } // 设置激活工作表 if sheetName == "按一级分类" { f.SetActiveSheet(index) } return nil } func exportGroupToSheet22(client *elastic.Client, f *excelize.File, groupField, sheetName string) error { ctx := context.Background() // 查询条件:状态为存续,非个体工商户 query := elastic.NewBoolQuery(). Must(elastic.NewTermQuery("company_status", "存续")). MustNot(elastic.NewTermQuery("company_type", "个体工商户")) // 聚合:按 groupField 分组 countAgg := elastic.NewTermsAggregation().Field(groupField).Size(500) countRes, err := client.Search(). Index(indexName). Query(query). Size(0). Aggregation("by_group", countAgg). Do(ctx) if err != nil { return fmt.Errorf("获取分组总数失败: %v", err) } agg, ok := countRes.Aggregations.Terms("by_group") if !ok { return fmt.Errorf("未找到分组聚合结果") } // 创建 Sheet 和标题 index, _ := f.NewSheet(sheetName) headers := []string{groupField, "company_name", "capital", "company_type", "company_status"} for i, h := range headers { cell, _ := excelize.CoordinatesToCellName(i+1, 1) f.SetCellValue(sheetName, cell, h) } row := 2 // 遍历每个分组,查询 top 5% 公司 for _, bucket := range agg.Buckets { groupVal := bucket.Key docCount := bucket.DocCount if docCount == 0 { continue } size := int(math.Max(1, float64(docCount)*0.05)) groupStr := fmt.Sprintf("%v", groupVal) // 构造查询(加 exists 过滤,避免排序错误) subQuery := elastic.NewBoolQuery(). Must(elastic.NewTermQuery(groupField, groupStr)). Must(elastic.NewExistsQuery("capital")) // 避免排序失败 sourceCtx := elastic.NewFetchSourceContext(true).Include( "company_name", "capital", "company_type", "company_status", groupField, ) // 查询 scroll := client.Scroll(indexName). Query(subQuery). SortWithInfo(elastic.SortInfo{ Field: "capital", Ascending: false, Missing: "_last", }). Size(500). // 每批 500 条 FetchSourceContext(sourceCtx) defer scroll.Clear(ctx) // 自动释放 var fetched int for fetched < size { res, err := scroll.Do(ctx) if err == io.EOF { break // 没有更多数据 } if err != nil { fmt.Printf("scroll 查询失败 group=%v, err=%v\n", groupVal, err) break } for _, hit := range res.Hits.Hits { if fetched >= size { break } var c map[string]interface{} if err := json.Unmarshal(hit.Source, &c); err != nil { continue } data := []interface{}{ groupStr, c["company_name"], c["capital"], c["company_type"], c["company_status"], } for col, val := range data { cell, _ := excelize.CoordinatesToCellName(col+1, row) f.SetCellValue(sheetName, cell, val) } row++ fetched++ } } // 清理 scroll 上下文 _ = scroll.Clear(ctx) if err != nil { fmt.Printf("查询失败 group=%v, err=%v, type=%T\n", groupVal, err, err) if elasticErr, ok := err.(*elastic.Error); ok && elasticErr.Details != nil { fmt.Printf("详细错误: type=%v, reason=%v\n", elasticErr.Details.Type, elasticErr.Details.Reason) for i, cause := range elasticErr.Details.RootCause { fmt.Printf("RootCause[%d]: %v (%v)\n", i, cause.Reason, cause.Type) } } continue } } // 设置激活表 if sheetName == "按一级分类" { f.SetActiveSheet(index) } return nil } func exportGroupToSheet2(client *elastic.Client, f *excelize.File, groupField, sheetName string) error { const maxRowsPerSheet = 1048576 ctx := context.Background() // 查询条件:状态为存续,非个体工商户 query := elastic.NewBoolQuery(). Must(elastic.NewTermQuery("company_status", "存续")). MustNot(elastic.NewTermQuery("company_type", "个体工商户")) // 聚合:按 groupField 分组 countAgg := elastic.NewTermsAggregation().Field(groupField).Size(500) countRes, err := client.Search(). Index(indexName). Query(query). Size(0). Aggregation("by_group", countAgg). Do(ctx) if err != nil { return fmt.Errorf("获取分组总数失败: %v", err) } agg, ok := countRes.Aggregations.Terms("by_group") if !ok { return fmt.Errorf("未找到分组聚合结果") } headers := []string{groupField, "company_name", "capital", "company_type", "company_status"} sheetIndex := 0 currSheet := sheetName f.NewSheet(currSheet) writeHeaders(f, currSheet, headers) row := 2 for _, bucket := range agg.Buckets { groupVal := bucket.Key docCount := bucket.DocCount if docCount == 0 { continue } size := int(math.Max(1, float64(docCount)*0.05)) groupStr := fmt.Sprintf("%v", groupVal) subQuery := elastic.NewBoolQuery(). Must(elastic.NewTermQuery(groupField, groupStr)). Must(elastic.NewExistsQuery("capital")) sourceCtx := elastic.NewFetchSourceContext(true).Include( "company_name", "capital", "company_type", "company_status", groupField, ) scroll := client.Scroll(indexName). Query(subQuery). SortWithInfo(elastic.SortInfo{ Field: "capital", Ascending: false, Missing: "_last", }). Size(500). FetchSourceContext(sourceCtx) defer scroll.Clear(ctx) var fetched int for fetched < size { res, err := scroll.Do(ctx) if err == io.EOF { break } if err != nil { log.Printf("scroll 查询失败 group=%v, err=%v\n", groupVal, err) break } for _, hit := range res.Hits.Hits { if fetched >= size { break } // 超出最大行数,创建新 sheet if row > maxRowsPerSheet { sheetIndex++ currSheet = fmt.Sprintf("%s_%d", sheetName, sheetIndex) f.NewSheet(currSheet) writeHeaders(f, currSheet, headers) row = 2 } var c map[string]interface{} if err := json.Unmarshal(hit.Source, &c); err != nil { continue } if util.ObjToString(c["company_status"]) != "存续" { continue } data := []interface{}{ groupStr, c["company_name"], c["capital"], c["company_type"], c["company_status"], } for col, val := range data { cell, _ := excelize.CoordinatesToCellName(col+1, row) f.SetCellValue(currSheet, cell, val) } row++ fetched++ } } } // 设置激活表 if sheetName == "按一级分类" { if index, err := f.GetSheetIndex(sheetName); err == nil { f.SetActiveSheet(index) } } return nil } func writeHeaders(f *excelize.File, sheetName string, headers []string) { for i, h := range headers { cell, _ := excelize.CoordinatesToCellName(i+1, 1) f.SetCellValue(sheetName, cell, h) } } const maxRowsPerSheet = 1048576 // exportSelectedGroupsToFiles 导出多文件 func exportSelectedGroupsToFiles(client *elastic.Client, groupField string, groupValues []string, outputDir string) error { ctx := context.Background() headers := []string{groupField, "company_name", "capital", "company_type", "company_status"} for _, groupVal := range groupValues { // 构造子查询 subQuery := elastic.NewBoolQuery(). Must( elastic.NewTermQuery("company_status", "存续"), elastic.NewTermQuery(groupField, groupVal), ). MustNot(elastic.NewTermQuery("company_type", "个体工商户")). Must(elastic.NewExistsQuery("capital")) // 获取该分组的总数 countRes, err := client.Count(indexName).Query(subQuery).Do(ctx) if err != nil || countRes == 0 { log.Printf("跳过分组 %s(无数据或查询失败)\n", groupVal) continue } size := int(math.Max(1, float64(countRes)*0.2)) sourceCtx := elastic.NewFetchSourceContext(true).Include( "company_name", "capital", "company_type", "company_status", groupField, ) scroll := client.Scroll(indexName). Query(subQuery). SortWithInfo(elastic.SortInfo{ Field: "capital", Ascending: false, Missing: "_last", }). Size(500). FetchSourceContext(sourceCtx) defer scroll.Clear(ctx) part := 1 row := 2 f := excelize.NewFile() sheetName := "数据" f.NewSheet(sheetName) writeHeaders(f, sheetName, headers) fetched := 0 for fetched < size { res, err := scroll.Do(ctx) if err == io.EOF { break } if err != nil { log.Printf("scroll 查询失败 group=%v, err=%v\n", groupVal, err) break } for _, hit := range res.Hits.Hits { if fetched >= size { break } if row > maxRowsPerSheet { // 超出最大行数,保存当前文件,开启新文件 filename := fmt.Sprintf("%s/%s_top5%%_part%d.xlsx", outputDir, sanitizeFileName(groupVal), part) f.DeleteSheet("Sheet1") if err := f.SaveAs(filename); err != nil { log.Printf("❌ 保存文件失败 [%s]: %v\n", filename, err) } else { log.Printf("✅ 已保存:%s(%d 条)", filename, row-2) } // 重置 part++ row = 2 f = excelize.NewFile() f.NewSheet(sheetName) writeHeaders(f, sheetName, headers) } var c map[string]interface{} if err := json.Unmarshal(hit.Source, &c); err != nil { continue } data := []interface{}{ groupVal, c["company_name"], c["capital"], c["company_type"], c["company_status"], } for col, val := range data { cell, _ := excelize.CoordinatesToCellName(col+1, row) f.SetCellValue(sheetName, cell, val) } row++ fetched++ } } // 保存最后一个文件 if row > 2 { filename := fmt.Sprintf("%s/%s_top5%%_part%d.xlsx", outputDir, sanitizeFileName(groupVal), part) f.DeleteSheet("Sheet1") if err := f.SaveAs(filename); err != nil { log.Printf("❌ 保存文件失败 [%s]: %v\n", filename, err) } else { log.Printf("✅ 完成导出:%s(%d 条)", filename, row-2) } } } return nil } func sanitizeFileName(name string) string { // 替换非法文件名字符 re := regexp.MustCompile(`[\\/:*?"<>|]`) return re.ReplaceAllString(name, "_") } func dealXlsx() { log.Println("开始处理数据") // 打开原始文件 f, err := excelize.OpenFile("top_companies_by_industry_levels.xlsx") if err != nil { panic(err) } // 获取第一个工作表 sheetName := f.GetSheetName(3) rows, err := f.GetRows(sheetName) if err != nil { panic(err) } // 创建新文件用于保存筛选后的数据 newFile := excelize.NewFile() newSheet := newFile.GetSheetName(0) // 遍历原始数据并筛选“存续”状态 rowIndex := 0 for i, row := range rows { if rowIndex == 0 { // 写入表头 _ = newFile.SetSheetRow(newSheet, fmt.Sprintf("A%d", rowIndex+1), &row) rowIndex++ continue } if i%10000 == 0 { log.Println("iiiiiii", row[1]) } // 判断公司状态列(假设在第 5 列,即下标 4) if len(row) >= 5 && row[4] == "存续" { _ = newFile.SetSheetRow(newSheet, fmt.Sprintf("A%d", rowIndex+1), &row) rowIndex++ } } // 保存新文件 if err := newFile.SaveAs("筛选后-存续公司3.xlsx"); err != nil { panic(err) } log.Println("筛选完成,结果保存为 筛选后-存续公司3.xlsx") }