123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755 |
- package main
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/olivere/elastic/v7"
- "github.com/xuri/excelize/v2"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "log"
- "math"
- "os"
- "regexp"
- )
- type Company struct {
- CompanyName string `json:"company_name"`
- Capital float64 `json:"capital"`
- CompanyType string `json:"company_type"`
- CompanyStatus string `json:"company_status"`
- }
- var (
- indexName = "qyxy"
- )
- func getQyxyNationToFiles() {
- url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("Elasticsearch 连接失败: %v", err)
- }
- //nationalSubs := []string{"石油和天然气开采业", "文教、工美、体育和娱乐用品制造业", "石油、煤炭及其他燃料加工业", "化学原料和化学制品制造业",
- // "医药制造业", "通用设备制造业", "专用设备制造业", "汽车制造业", "电信、广播电视和卫星传输服务", "互联网和相关服务", "软件和信息技术服务业", "货币金融服务",
- // "资本市场服务", "保险业", "其他金融业", "其他金融业", "教育", "新闻和出版业","生态保护和环境治理业"}
- nationalSubs := []string{"生态保护和环境治理业"}
- outputDir := "output"
- // 确保输出目录存在
- os.MkdirAll(outputDir, 0755)
- err = exportSelectedGroupsToFiles(client, "national_sub", nationalSubs, outputDir)
- if err != nil {
- log.Fatalf("导出失败: %v", err)
- }
- log.Println("🎉 所有分组导出完成")
- }
- // getQyxyNation getQyxyNation
- func getQyxyNation() {
- url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("Elasticsearch 连接失败: %v", err)
- }
- // 1. 创建 Excel 文件
- f := excelize.NewFile()
- // 2. 定义分组字段
- groupFields := []string{"national_sub"}
- sheetNames := []string{"按二级分类"}
- for i, field := range groupFields {
- log.Printf("查询并导出:%s\n", field)
- err := exportGroupToSheet2(client, f, field, sheetNames[i])
- if err != nil {
- log.Fatalf("导出失败(%s): %v", field, err)
- }
- }
- // 删除默认 Sheet
- f.DeleteSheet("Sheet1")
- // 保存文件
- if err := f.SaveAs("top_companies_by_industry_levels.xlsx"); err != nil {
- log.Fatalf("保存失败: %v", err)
- }
- log.Println("✅ 所有分组已导出完成: top_companies_by_industry_levels.xlsx")
- }
- func exportGroupToSheet(client *elastic.Client, f *excelize.File, groupField, sheetName string) error {
- ctx := context.Background()
- // 查询条件
- query := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery("company_status", "存续")).
- MustNot(elastic.NewTermQuery("company_type", "个体工商户"))
- // 字段筛选器
- sourceCtx := elastic.NewFetchSourceContext(true).Include(
- "company_name", "capital", "company_type", "company_status", groupField,
- )
- // 聚合构建
- topHitsAgg := elastic.NewTopHitsAggregation().
- Size(5).
- Sort("capital", false).
- FetchSourceContext(sourceCtx)
- termsAgg := elastic.NewTermsAggregation().Field(groupField).Size(500).
- SubAggregation("top_capital_companies", topHitsAgg)
- // 查询执行
- res, err := client.Search().
- Index(indexName).
- Query(query).
- Size(0).
- Aggregation("by_group", termsAgg).
- Do(ctx)
- if err != nil {
- return fmt.Errorf("Elasticsearch 查询失败: %v", err)
- }
- // 创建工作表
- index, _ := f.NewSheet(sheetName)
- headers := []string{groupField, "company_name", "capital", "company_type", "company_status"}
- for i, h := range headers {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- f.SetCellValue(sheetName, cell, h)
- }
- // 解析结果并写入
- agg, ok := res.Aggregations.Terms("by_group")
- if !ok {
- return fmt.Errorf("未找到分组聚合结果")
- }
- row := 2
- for _, bucket := range agg.Buckets {
- topHits, ok := bucket.TopHits("top_capital_companies")
- if !ok {
- continue
- }
- for _, hit := range topHits.Hits.Hits {
- var c Company
- if err := json.Unmarshal(hit.Source, &c); err != nil {
- continue
- }
- data := []interface{}{bucket.Key, c.CompanyName, c.Capital, c.CompanyType, c.CompanyStatus}
- for col, val := range data {
- cell, _ := excelize.CoordinatesToCellName(col+1, row)
- f.SetCellValue(sheetName, cell, val)
- }
- row++
- }
- }
- // 激活第一个 Sheet
- if sheetName == "按一级分类" {
- f.SetActiveSheet(index)
- }
- return nil
- }
- func exportGroupToSheet3(client *elastic.Client, f *excelize.File, groupField, sheetName string) error {
- ctx := context.Background()
- // 查询条件
- query := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery("company_status", "存续")).
- MustNot(elastic.NewTermQuery("company_type", "个体工商户"))
- // 第一阶段:获取分组文档总数
- countAgg := elastic.NewTermsAggregation().Field(groupField).Size(500)
- countRes, err := client.Search().
- Index(indexName).
- Query(query).
- Size(0).
- Aggregation("by_group", countAgg).
- Do(ctx)
- if err != nil {
- return fmt.Errorf("获取分组总数失败: %v", err)
- }
- // 解析分组桶
- agg, ok := countRes.Aggregations.Terms("by_group")
- if !ok {
- return fmt.Errorf("未找到分组聚合结果")
- }
- // 创建工作表
- //index, _ := f.NewSheet(sheetName)
- headers := []string{groupField, "company_name", "capital", "company_type", "company_status"}
- for i, h := range headers {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- f.SetCellValue(sheetName, cell, h)
- }
- // 第二阶段:分批查询每个分组的5%数据
- //row := 2
- for _, bucket := range agg.Buckets {
- log.Println(bucket)
- // 计算5%的数据量,至少取1条
- //size := int(math.Max(1, float64(bucket.DocCount)*0.05))
- }
- return nil
- }
- func exportGroupToSheet21(client *elastic.Client, f *excelize.File, groupField, sheetName string) error {
- ctx := context.Background()
- // 查询条件
- query := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery("company_status", "存续")).
- MustNot(elastic.NewTermQuery("company_type", "个体工商户"))
- // 第一步:获取分组及每个分组的文档总数
- countAgg := elastic.NewTermsAggregation().Field(groupField).Size(500)
- countRes, err := client.Search().
- Index(indexName).
- Query(query).
- Size(0).
- Aggregation("by_group", countAgg).
- Do(ctx)
- if err != nil {
- return fmt.Errorf("获取分组总数失败: %v", err)
- }
- agg, ok := countRes.Aggregations.Terms("by_group")
- if !ok {
- return fmt.Errorf("未找到分组聚合结果")
- }
- // 创建工作表
- index, _ := f.NewSheet(sheetName)
- headers := []string{groupField, "company_name", "capital", "company_type", "company_status"}
- for i, h := range headers {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- f.SetCellValue(sheetName, cell, h)
- }
- row := 2
- // 第二步:对每个分组进行查询并提取前 5% 的数据
- sourceCtx := elastic.NewFetchSourceContext(true).Include(
- "company_name", "capital", "company_type", "company_status", groupField,
- )
- for _, bucket := range agg.Buckets {
- groupVal := bucket.Key
- docCount := bucket.DocCount
- size := int(math.Max(1, float64(docCount)*0.05))
- groupStr := fmt.Sprintf("%v", groupVal)
- query := elastic.NewBoolQuery().Filter(
- elastic.NewTermQuery(groupField, groupStr),
- )
- src, _ := query.Source()
- b, _ := json.MarshalIndent(src, "", " ")
- fmt.Println("Query body:\n", string(b))
- //sourceCtx := elastic.NewFetchSourceContext(true).Include("company_name", "capital") // 确保不为 nil
- res, err := client.Search().
- Index(indexName).
- Query(query).
- Sort("capital", false).
- Size(size).
- FetchSourceContext(sourceCtx).
- Do(ctx)
- if err != nil {
- fmt.Printf("查询失败 group=%v, err=%v, type=%T\n", groupVal, err, err)
- continue
- }
- if res == nil {
- fmt.Println("查询结果为空(res=nil)")
- continue
- }
- fmt.Printf("查询成功: 命中 %d 条\n", res.Hits.TotalHits.Value)
- subAgg, ok := res.Aggregations.Terms("by_group")
- if !ok || len(subAgg.Buckets) == 0 {
- continue
- }
- topHits, ok := subAgg.Buckets[0].TopHits("top_capital_companies")
- if !ok {
- continue
- }
- for _, hit := range topHits.Hits.Hits {
- var c Company
- if err := json.Unmarshal(hit.Source, &c); err != nil {
- continue
- }
- data := []interface{}{groupVal, c.CompanyName, c.Capital, c.CompanyType, c.CompanyStatus}
- for col, val := range data {
- cell, _ := excelize.CoordinatesToCellName(col+1, row)
- f.SetCellValue(sheetName, cell, val)
- }
- row++
- }
- }
- // 设置激活工作表
- if sheetName == "按一级分类" {
- f.SetActiveSheet(index)
- }
- return nil
- }
- func exportGroupToSheet22(client *elastic.Client, f *excelize.File, groupField, sheetName string) error {
- ctx := context.Background()
- // 查询条件:状态为存续,非个体工商户
- query := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery("company_status", "存续")).
- MustNot(elastic.NewTermQuery("company_type", "个体工商户"))
- // 聚合:按 groupField 分组
- countAgg := elastic.NewTermsAggregation().Field(groupField).Size(500)
- countRes, err := client.Search().
- Index(indexName).
- Query(query).
- Size(0).
- Aggregation("by_group", countAgg).
- Do(ctx)
- if err != nil {
- return fmt.Errorf("获取分组总数失败: %v", err)
- }
- agg, ok := countRes.Aggregations.Terms("by_group")
- if !ok {
- return fmt.Errorf("未找到分组聚合结果")
- }
- // 创建 Sheet 和标题
- index, _ := f.NewSheet(sheetName)
- headers := []string{groupField, "company_name", "capital", "company_type", "company_status"}
- for i, h := range headers {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- f.SetCellValue(sheetName, cell, h)
- }
- row := 2
- // 遍历每个分组,查询 top 5% 公司
- for _, bucket := range agg.Buckets {
- groupVal := bucket.Key
- docCount := bucket.DocCount
- if docCount == 0 {
- continue
- }
- size := int(math.Max(1, float64(docCount)*0.05))
- groupStr := fmt.Sprintf("%v", groupVal)
- // 构造查询(加 exists 过滤,避免排序错误)
- subQuery := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery(groupField, groupStr)).
- Must(elastic.NewExistsQuery("capital")) // 避免排序失败
- sourceCtx := elastic.NewFetchSourceContext(true).Include(
- "company_name", "capital", "company_type", "company_status", groupField,
- )
- // 查询
- scroll := client.Scroll(indexName).
- Query(subQuery).
- SortWithInfo(elastic.SortInfo{
- Field: "capital",
- Ascending: false,
- Missing: "_last",
- }).
- Size(500). // 每批 500 条
- FetchSourceContext(sourceCtx)
- defer scroll.Clear(ctx) // 自动释放
- var fetched int
- for fetched < size {
- res, err := scroll.Do(ctx)
- if err == io.EOF {
- break // 没有更多数据
- }
- if err != nil {
- fmt.Printf("scroll 查询失败 group=%v, err=%v\n", groupVal, err)
- break
- }
- for _, hit := range res.Hits.Hits {
- if fetched >= size {
- break
- }
- var c map[string]interface{}
- if err := json.Unmarshal(hit.Source, &c); err != nil {
- continue
- }
- data := []interface{}{
- groupStr,
- c["company_name"],
- c["capital"],
- c["company_type"],
- c["company_status"],
- }
- for col, val := range data {
- cell, _ := excelize.CoordinatesToCellName(col+1, row)
- f.SetCellValue(sheetName, cell, val)
- }
- row++
- fetched++
- }
- }
- // 清理 scroll 上下文
- _ = scroll.Clear(ctx)
- if err != nil {
- fmt.Printf("查询失败 group=%v, err=%v, type=%T\n", groupVal, err, err)
- if elasticErr, ok := err.(*elastic.Error); ok && elasticErr.Details != nil {
- fmt.Printf("详细错误: type=%v, reason=%v\n", elasticErr.Details.Type, elasticErr.Details.Reason)
- for i, cause := range elasticErr.Details.RootCause {
- fmt.Printf("RootCause[%d]: %v (%v)\n", i, cause.Reason, cause.Type)
- }
- }
- continue
- }
- }
- // 设置激活表
- if sheetName == "按一级分类" {
- f.SetActiveSheet(index)
- }
- return nil
- }
- func exportGroupToSheet2(client *elastic.Client, f *excelize.File, groupField, sheetName string) error {
- const maxRowsPerSheet = 1048576
- ctx := context.Background()
- // 查询条件:状态为存续,非个体工商户
- query := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery("company_status", "存续")).
- MustNot(elastic.NewTermQuery("company_type", "个体工商户"))
- // 聚合:按 groupField 分组
- countAgg := elastic.NewTermsAggregation().Field(groupField).Size(500)
- countRes, err := client.Search().
- Index(indexName).
- Query(query).
- Size(0).
- Aggregation("by_group", countAgg).
- Do(ctx)
- if err != nil {
- return fmt.Errorf("获取分组总数失败: %v", err)
- }
- agg, ok := countRes.Aggregations.Terms("by_group")
- if !ok {
- return fmt.Errorf("未找到分组聚合结果")
- }
- headers := []string{groupField, "company_name", "capital", "company_type", "company_status"}
- sheetIndex := 0
- currSheet := sheetName
- f.NewSheet(currSheet)
- writeHeaders(f, currSheet, headers)
- row := 2
- for _, bucket := range agg.Buckets {
- groupVal := bucket.Key
- docCount := bucket.DocCount
- if docCount == 0 {
- continue
- }
- size := int(math.Max(1, float64(docCount)*0.05))
- groupStr := fmt.Sprintf("%v", groupVal)
- subQuery := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery(groupField, groupStr)).
- Must(elastic.NewExistsQuery("capital"))
- sourceCtx := elastic.NewFetchSourceContext(true).Include(
- "company_name", "capital", "company_type", "company_status", groupField,
- )
- scroll := client.Scroll(indexName).
- Query(subQuery).
- SortWithInfo(elastic.SortInfo{
- Field: "capital",
- Ascending: false,
- Missing: "_last",
- }).
- Size(500).
- FetchSourceContext(sourceCtx)
- defer scroll.Clear(ctx)
- var fetched int
- for fetched < size {
- res, err := scroll.Do(ctx)
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Printf("scroll 查询失败 group=%v, err=%v\n", groupVal, err)
- break
- }
- for _, hit := range res.Hits.Hits {
- if fetched >= size {
- break
- }
- // 超出最大行数,创建新 sheet
- if row > maxRowsPerSheet {
- sheetIndex++
- currSheet = fmt.Sprintf("%s_%d", sheetName, sheetIndex)
- f.NewSheet(currSheet)
- writeHeaders(f, currSheet, headers)
- row = 2
- }
- var c map[string]interface{}
- if err := json.Unmarshal(hit.Source, &c); err != nil {
- continue
- }
- if util.ObjToString(c["company_status"]) != "存续" {
- continue
- }
- data := []interface{}{
- groupStr,
- c["company_name"],
- c["capital"],
- c["company_type"],
- c["company_status"],
- }
- for col, val := range data {
- cell, _ := excelize.CoordinatesToCellName(col+1, row)
- f.SetCellValue(currSheet, cell, val)
- }
- row++
- fetched++
- }
- }
- }
- // 设置激活表
- if sheetName == "按一级分类" {
- if index, err := f.GetSheetIndex(sheetName); err == nil {
- f.SetActiveSheet(index)
- }
- }
- return nil
- }
- func writeHeaders(f *excelize.File, sheetName string, headers []string) {
- for i, h := range headers {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- f.SetCellValue(sheetName, cell, h)
- }
- }
- const maxRowsPerSheet = 1048576
- // exportSelectedGroupsToFiles 导出多文件
- func exportSelectedGroupsToFiles(client *elastic.Client, groupField string, groupValues []string, outputDir string) error {
- ctx := context.Background()
- headers := []string{groupField, "company_name", "capital", "company_type", "company_status"}
- for _, groupVal := range groupValues {
- // 构造子查询
- subQuery := elastic.NewBoolQuery().
- Must(
- elastic.NewTermQuery("company_status", "存续"),
- elastic.NewTermQuery(groupField, groupVal),
- ).
- MustNot(elastic.NewTermQuery("company_type", "个体工商户")).
- Must(elastic.NewExistsQuery("capital"))
- // 获取该分组的总数
- countRes, err := client.Count(indexName).Query(subQuery).Do(ctx)
- if err != nil || countRes == 0 {
- log.Printf("跳过分组 %s(无数据或查询失败)\n", groupVal)
- continue
- }
- size := int(math.Max(1, float64(countRes)*0.2))
- sourceCtx := elastic.NewFetchSourceContext(true).Include(
- "company_name", "capital", "company_type", "company_status", groupField,
- )
- scroll := client.Scroll(indexName).
- Query(subQuery).
- SortWithInfo(elastic.SortInfo{
- Field: "capital",
- Ascending: false,
- Missing: "_last",
- }).
- Size(500).
- FetchSourceContext(sourceCtx)
- defer scroll.Clear(ctx)
- part := 1
- row := 2
- f := excelize.NewFile()
- sheetName := "数据"
- f.NewSheet(sheetName)
- writeHeaders(f, sheetName, headers)
- fetched := 0
- for fetched < size {
- res, err := scroll.Do(ctx)
- if err == io.EOF {
- break
- }
- if err != nil {
- log.Printf("scroll 查询失败 group=%v, err=%v\n", groupVal, err)
- break
- }
- for _, hit := range res.Hits.Hits {
- if fetched >= size {
- break
- }
- if row > maxRowsPerSheet {
- // 超出最大行数,保存当前文件,开启新文件
- filename := fmt.Sprintf("%s/%s_top5%%_part%d.xlsx", outputDir, sanitizeFileName(groupVal), part)
- f.DeleteSheet("Sheet1")
- if err := f.SaveAs(filename); err != nil {
- log.Printf("❌ 保存文件失败 [%s]: %v\n", filename, err)
- } else {
- log.Printf("✅ 已保存:%s(%d 条)", filename, row-2)
- }
- // 重置
- part++
- row = 2
- f = excelize.NewFile()
- f.NewSheet(sheetName)
- writeHeaders(f, sheetName, headers)
- }
- var c map[string]interface{}
- if err := json.Unmarshal(hit.Source, &c); err != nil {
- continue
- }
- data := []interface{}{
- groupVal,
- c["company_name"],
- c["capital"],
- c["company_type"],
- c["company_status"],
- }
- for col, val := range data {
- cell, _ := excelize.CoordinatesToCellName(col+1, row)
- f.SetCellValue(sheetName, cell, val)
- }
- row++
- fetched++
- }
- }
- // 保存最后一个文件
- if row > 2 {
- filename := fmt.Sprintf("%s/%s_top5%%_part%d.xlsx", outputDir, sanitizeFileName(groupVal), part)
- f.DeleteSheet("Sheet1")
- if err := f.SaveAs(filename); err != nil {
- log.Printf("❌ 保存文件失败 [%s]: %v\n", filename, err)
- } else {
- log.Printf("✅ 完成导出:%s(%d 条)", filename, row-2)
- }
- }
- }
- return nil
- }
- func sanitizeFileName(name string) string {
- // 替换非法文件名字符
- re := regexp.MustCompile(`[\\/:*?"<>|]`)
- return re.ReplaceAllString(name, "_")
- }
- func dealXlsx() {
- log.Println("开始处理数据")
- // 打开原始文件
- f, err := excelize.OpenFile("top_companies_by_industry_levels.xlsx")
- if err != nil {
- panic(err)
- }
- // 获取第一个工作表
- sheetName := f.GetSheetName(3)
- rows, err := f.GetRows(sheetName)
- if err != nil {
- panic(err)
- }
- // 创建新文件用于保存筛选后的数据
- newFile := excelize.NewFile()
- newSheet := newFile.GetSheetName(0)
- // 遍历原始数据并筛选“存续”状态
- rowIndex := 0
- for i, row := range rows {
- if rowIndex == 0 {
- // 写入表头
- _ = newFile.SetSheetRow(newSheet, fmt.Sprintf("A%d", rowIndex+1), &row)
- rowIndex++
- continue
- }
- if i%10000 == 0 {
- log.Println("iiiiiii", row[1])
- }
- // 判断公司状态列(假设在第 5 列,即下标 4)
- if len(row) >= 5 && row[4] == "存续" {
- _ = newFile.SetSheetRow(newSheet, fmt.Sprintf("A%d", rowIndex+1), &row)
- rowIndex++
- }
- }
- // 保存新文件
- if err := newFile.SaveAs("筛选后-存续公司3.xlsx"); err != nil {
- panic(err)
- }
- log.Println("筛选完成,结果保存为 筛选后-存续公司3.xlsx")
- }
|