123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517 |
- package main
- import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "github.com/elastic/go-elasticsearch/v7"
- "github.com/olivere/elastic/v7"
- "github.com/xuri/excelize/v2"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "strings"
- "time"
- )
- // exportQyxy 导出企业数据
- func exportQyxy() {
- type Hit struct {
- ID string `json:"_id"`
- Source map[string]interface{} `json:"_source"`
- }
- type ESResponse struct {
- ScrollID string `json:"_scroll_id"`
- Hits struct {
- Hits []Hit `json:"hits"`
- } `json:"hits"`
- }
- // 1. 配置 ES 连接信息
- url := "http://172.17.4.184:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- cfg := elasticsearch.Config{
- Addresses: []string{
- url, // 替换为你的 ES 地址
- },
- Username: username, // 替换为你的用户名
- Password: password, // 替换为你的密码
- }
- es, err := elasticsearch.NewClient(cfg)
- if err != nil {
- log.Fatalf("Error creating the client: %s", err)
- }
- scrollTime := 2 * time.Minute
- indexName := "qyxy"
- batchSize := 5000
- maxRowsPerFile := 500000
- headers := []string{
- "_id", "cancel_date", "company_name", "history_name", "cancel_reason",
- "company_type", "business_scope", "legal_person", "capital", "credit_no",
- "tax_code", "company_code", "org_code", "establish_date", "authority",
- "issue_date", "company_area", "company_city", "company_district",
- "company_phone", "company_address", "company_email", "employee_num",
- }
- query := map[string]interface{}{
- "size": batchSize,
- "track_total_hits": true,
- "query": map[string]interface{}{
- "bool": map[string]interface{}{
- "must": []interface{}{
- map[string]interface{}{
- "range": map[string]interface{}{
- "cancel_date_unix": map[string]interface{}{
- "gte": 1704038400,
- },
- },
- },
- map[string]interface{}{
- "terms": map[string]interface{}{
- "company_status": []string{"注销", "吊销"},
- },
- },
- map[string]interface{}{
- "term": map[string]interface{}{
- "company_area": "河南",
- },
- },
- },
- },
- },
- "_source": headers,
- }
- var buf bytes.Buffer
- if err := json.NewEncoder(&buf).Encode(query); err != nil {
- log.Fatalf("query encode failed: %v", err)
- }
- res, err := es.Search(
- es.Search.WithContext(context.Background()),
- es.Search.WithIndex(indexName),
- es.Search.WithBody(&buf),
- es.Search.WithScroll(scrollTime),
- )
- if err != nil {
- log.Fatalf("initial scroll search error: %v", err)
- }
- defer res.Body.Close()
- // 状态追踪变量
- scrollID := ""
- fileIndex := 1
- rowNum := 2
- totalCount := 0
- f := excelize.NewFile()
- sheet := "Sheet1"
- writeHeader := func(f *excelize.File) {
- for i, h := range headers {
- cell, _ := excelize.CoordinatesToCellName(i+1, 1)
- f.SetCellValue(sheet, cell, h)
- }
- }
- writeHeader(f)
- for {
- var r ESResponse
- if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
- log.Fatalf("decode error: %v", err)
- }
- if len(r.Hits.Hits) == 0 {
- break
- }
- scrollID = r.ScrollID
- for _, hit := range r.Hits.Hits {
- for col, h := range headers {
- cell, _ := excelize.CoordinatesToCellName(col+1, rowNum)
- val := hit.Source[h]
- if h == "_id" {
- val = hit.ID
- }
- f.SetCellValue(sheet, cell, val)
- }
- rowNum++
- totalCount++
- // 满 50 万行,保存并开启新文件
- if rowNum > maxRowsPerFile {
- filename := fmt.Sprintf("qyxy_export_%d.xlsx", fileIndex)
- if err := f.SaveAs(filename); err != nil {
- log.Fatalf("failed to save file %s: %v", filename, err)
- }
- fmt.Printf("✅ 导出文件 [%s] 完成,累计 %d 条\n", filename, totalCount)
- fileIndex++
- rowNum = 2
- f = excelize.NewFile()
- writeHeader(f)
- }
- }
- fmt.Printf("当前已处理 %d 条...\n", totalCount)
- // 拉下一页
- res, err = es.Scroll(
- es.Scroll.WithScrollID(scrollID),
- es.Scroll.WithScroll(scrollTime),
- )
- if err != nil {
- log.Fatalf("scroll next error: %v", err)
- }
- defer res.Body.Close()
- }
- // 保存最后一批文件
- if rowNum > 2 {
- filename := fmt.Sprintf("qyxy_export_%d.xlsx", fileIndex)
- if err := f.SaveAs(filename); err != nil {
- log.Fatalf("failed to save final file %s: %v", filename, err)
- }
- fmt.Printf("✅ 最后文件 [%s] 完成,累计 %d 条\n", filename, totalCount)
- }
- // 清理 scroll
- if scrollID != "" {
- _, _ = es.ClearScroll(es.ClearScroll.WithScrollID(scrollID))
- }
- fmt.Println("🎉 全部导出完成,总数:", totalCount)
- }
- // getGD 获取广东企业数据
- func getGD() {
- url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- index := "qyxy" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- //85 抽取库
- //Mgo := &mongodb.MongodbSim{
- // //MongodbAddr: "127.0.0.1:27080",
- // MongodbAddr: "172.17.4.85:27080",
- // DbName: "top",
- // Size: 10,
- // //Direct: true,
- //}
- //Mgo.InitPool()
- MgoB := &mongodb.MongodbSim{
- MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "qfw",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoB.InitPool()
- //2023年01-01 2023-10-01,,1-3季度
- //2024-1 - 2024-4;1704038400-1711900800
- //2023-10-1 2024-1-1;1696089600-1704038400
- //城市范围
- //areaTermsQuery := elastic.NewTermsQuery("company_city", "北京市")
- //rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400)
- //query := elastic.NewBoolQuery().
- // Must(areaTermsQuery).
- // Must(rangeQuery)
- //---------------------------//
- //query := elastic.NewBoolQuery()
- //query.Must(elastic.NewMatchQuery("company_area", "广东"))
- ////query.Must(elastic.NewTermQuery("company_type", "北京市"))
- // 构建查询条件
- query := elastic.NewBoolQuery().
- MustNot(elastic.NewTermQuery("company_type", "个体工商户")). // 排除 company_type 为 "个体工商户"
- Filter(elastic.NewTermQuery("company_area", "广东")) // 过滤 company_area 为 "广东"
- //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000")
- //query := elastic.NewBoolQuery().
- // //北京,天津,河北,上海,江苏,浙江,安徽
- // //Must(elastic.NewTermQuery("area", "北京市")).
- // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
- // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
- // Must(rangeQuery)
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "10m"
- searchSource := elastic.NewSearchSource().
- Query(query).
- Size(10000).
- Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll(index).
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- total := 0
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- //存入新表
- insert := map[string]interface{}{
- "company_name": doc["company_name"],
- "id": doc["id"],
- "credit_no": doc["credit_no"],
- "company_code": doc["company_code"],
- }
- if strings.Contains(util.ObjToString(doc["company_name"]), "银行") || strings.Contains(util.ObjToString(doc["company_name"]), "保险") || strings.Contains(util.ObjToString(doc["company_name"]), "证券") {
- insert["wcc_type"] = 1
- }
- err = MgoB.InsertOrUpdate("qfw", "wcc_2025_guangdong_qyxy", insert)
- if err != nil {
- log.Println("error", doc["id"])
- }
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- log.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Println("滚动搜索失败:", err, res)
- break // 处理错误时退出循环
- }
- }
- // 在循环外调用 ClearScroll
- _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
- if err != nil {
- log.Printf("清理滚动搜索失败:%s", err)
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- }
- // getNeqData 获取id _id 不相等数据
- func getNeqData() {
- // 本地
- //mgo := &mongodb.MongodbSim{
- // MongodbAddr: "127.0.0.1:27017",
- // DbName: "wcc",
- // Size: 10,
- //}
- //mgo.InitPool()
- mgo := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- DbName: "qfw",
- Size: 10,
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- }
- mgo.InitPool()
- //url := "http://127.0.0.1:19908"
- url := "http://172.17.4.184:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- //// 创建查询条件
- //q := elastic.NewBoolQuery().
- // //Must(elastic.NewMatchQuery("toptype", "预告")).
- // MustNot(elastic.NewTermQuery("_id", "id"))
- //Filter(elastic.NewRangeQuery("comeintime").Gte(1694707200).Lte(1695121200))
- // 构建查询
- query := elastic.NewBoolQuery().
- Must(
- elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)),
- //elastic.NewRangeQuery("comeintime").Gte(1694707200),
- )
- //query := elastic.NewBoolQuery().
- // Must(elastic.NewMatchQuery("title", "租公租房提取公积金")).
- // Must(elastic.NewTermQuery("toptype", "拟建")).Must(elastic.NewMatchPhraseQuery())
- count := 0
- // 执行Count请求来获取文档总数
- countResult, err := client.Count().Index("qyxy").Query(query).Do(context.Background())
- if err != nil {
- log.Fatalf("执行Count请求失败:%s", err)
- }
- // 获取符合条件的文档总数
- total := countResult
- fmt.Printf("符合条件的文档总数:%d\n", total)
- //
- //开始滚动搜索
- scrollService := client.Scroll("qyxy").Query(query).Size(10000).FetchSource(true)
- for {
- results, err := scrollService.Do(context.Background())
- if err != nil {
- log.Fatalf("滚动搜索失败:%s", err)
- }
- fmt.Println("current count:", count)
- if len(results.Hits.Hits) == 0 {
- // 没有更多的文档了,退出循环
- break
- }
- for _, hit := range results.Hits.Hits {
- // 处理每个文档
- // ...
- item := make(map[string]interface{})
- if err := json.Unmarshal(hit.Source, &item); err != nil {
- log.Printf("解码文档失败:%s\n", err)
- continue
- }
- save := map[string]interface{}{
- "id": item["id"],
- "company_name": item["company_name"],
- }
- mgo.Save("wcc_es_id_err_0428", save)
- }
- count += len(results.Hits.Hits)
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- }
- // deleteEs 删除 es 数据
- func deleteEs() {
- //url := "http://127.0.0.1:19905"
- ////url := "http://172.17.4.184:19905"
- //username := "jybid"
- //password := "Top2023_JEB01i@31"
- url := "http://127.0.0.1:19805"
- //url := "http://172.17.4.184:19905"
- username := "es_all"
- password := "TopJkO2E_d1x"
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- // 构建查询
- query := elastic.NewBoolQuery().
- Must(
- elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)),
- elastic.NewRangeQuery("comeintime").Gte(1672416000),
- )
- // 执行Count请求来获取文档总数
- countResult, err := client.Count().Index("bidding").Query(query).Do(context.Background())
- if err != nil {
- log.Fatalf("执行Count请求失败:%s", err)
- }
- // 获取符合条件的文档总数
- total := countResult
- fmt.Printf("符合条件的文档总数:%d\n", total)
- // 创建删除请求
- deleteService := client.DeleteByQuery().Index("bidding").Query(query)
- // 执行删除操作
- response, err := deleteService.Do(context.Background())
- if err != nil {
- log.Fatalf("执行删除操作失败:%s", err)
- }
- // 检查删除操作的结果
- if response != nil {
- fmt.Printf("已删除文档数:%d\n", response.Deleted)
- } else {
- fmt.Println("删除操作没有返回结果。")
- }
- }
- type CreditLabel struct {
- ZhimaToptype string `json:"zhima_toptype"`
- ZhimaSubtype string `json:"zhima_subtype"`
- ZhimaName string `json:"zhima_name"`
- }
- // getZhiMa 芝麻标签存在
- func getZhiMa() {
- client, err := elastic.NewClient(elastic.SetURL("http://192.168.3.149:9201"))
- if err != nil {
- panic(err)
- }
- // 查询 zhima_labels 字段存在的数据
- query := elastic.NewExistsQuery("zhima_labels")
- searchResult, err := client.Search().
- Index("qyxy").
- Query(query).
- Do(context.Background())
- if err != nil {
- panic(err)
- }
- for _, hit := range searchResult.Hits.Hits {
- var label CreditLabel
- err := json.Unmarshal(hit.Source, &label)
- if err != nil {
- panic(err)
- }
- fmt.Println(label)
- }
- }
|