package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "strings" ) // getGD 获取广东企业数据 func getGD() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "qyxy" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //85 抽取库 //Mgo := &mongodb.MongodbSim{ // //MongodbAddr: "127.0.0.1:27080", // MongodbAddr: "172.17.4.85:27080", // DbName: "top", // Size: 10, // //Direct: true, //} //Mgo.InitPool() MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.31.31.202:27081,172.20.45.128:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() //2023年01-01 2023-10-01,,1-3季度 //2024-1 - 2024-4;1704038400-1711900800 //2023-10-1 2024-1-1;1696089600-1704038400 //城市范围 //areaTermsQuery := elastic.NewTermsQuery("company_city", "北京市") //rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400) //query := elastic.NewBoolQuery(). // Must(areaTermsQuery). // Must(rangeQuery) //---------------------------// //query := elastic.NewBoolQuery() //query.Must(elastic.NewMatchQuery("company_area", "广东")) ////query.Must(elastic.NewTermQuery("company_type", "北京市")) // 构建查询条件 query := elastic.NewBoolQuery(). MustNot(elastic.NewTermQuery("company_type", "个体工商户")). // 排除 company_type 为 "个体工商户" Filter(elastic.NewTermQuery("company_area", "广东")) // 过滤 company_area 为 "广东" //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000") //query := elastic.NewBoolQuery(). // //北京,天津,河北,上海,江苏,浙江,安徽 // //Must(elastic.NewTermQuery("area", "北京市")). // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } //存入新表 insert := map[string]interface{}{ "company_name": doc["company_name"], "id": doc["id"], "credit_no": doc["credit_no"], "company_code": doc["company_code"], } if strings.Contains(util.ObjToString(doc["company_name"]), "银行") || strings.Contains(util.ObjToString(doc["company_name"]), "保险") || strings.Contains(util.ObjToString(doc["company_name"]), "证券") { insert["wcc_type"] = 1 } err = MgoB.InsertOrUpdate("qfw", "wcc_2025_guangdong_qyxy", insert) if err != nil { log.Println("error", doc["id"]) } } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } // getNeqData 获取id _id 不相等数据 func getNeqData() { // 本地 //mgo := &mongodb.MongodbSim{ // MongodbAddr: "127.0.0.1:27017", // DbName: "wcc", // Size: 10, //} //mgo.InitPool() mgo := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", DbName: "qfw", Size: 10, UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", } mgo.InitPool() //url := "http://127.0.0.1:19908" url := "http://172.17.4.184:19908" username := "jybid" password := "Top2023_JEB01i@31" // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //// 创建查询条件 //q := elastic.NewBoolQuery(). // //Must(elastic.NewMatchQuery("toptype", "预告")). // MustNot(elastic.NewTermQuery("_id", "id")) //Filter(elastic.NewRangeQuery("comeintime").Gte(1694707200).Lte(1695121200)) // 构建查询 query := elastic.NewBoolQuery(). Must( elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)), //elastic.NewRangeQuery("comeintime").Gte(1694707200), ) //query := elastic.NewBoolQuery(). // Must(elastic.NewMatchQuery("title", "租公租房提取公积金")). // Must(elastic.NewTermQuery("toptype", "拟建")).Must(elastic.NewMatchPhraseQuery()) count := 0 // 执行Count请求来获取文档总数 countResult, err := client.Count().Index("qyxy").Query(query).Do(context.Background()) if err != nil { log.Fatalf("执行Count请求失败:%s", err) } // 获取符合条件的文档总数 total := countResult fmt.Printf("符合条件的文档总数:%d\n", total) // //开始滚动搜索 scrollService := client.Scroll("qyxy").Query(query).Size(10000).FetchSource(true) for { results, err := scrollService.Do(context.Background()) if err != nil { log.Fatalf("滚动搜索失败:%s", err) } fmt.Println("current count:", count) if len(results.Hits.Hits) == 0 { // 没有更多的文档了,退出循环 break } for _, hit := range results.Hits.Hits { // 处理每个文档 // ... item := make(map[string]interface{}) if err := json.Unmarshal(hit.Source, &item); err != nil { log.Printf("解码文档失败:%s\n", err) continue } save := map[string]interface{}{ "id": item["id"], "company_name": item["company_name"], } mgo.Save("wcc_es_id_err_0428", save) } count += len(results.Hits.Hits) } fmt.Println("结束~~~~~~~~~~~~~~~") } // deleteEs 删除 es 数据 func deleteEs() { //url := "http://127.0.0.1:19905" ////url := "http://172.17.4.184:19905" //username := "jybid" //password := "Top2023_JEB01i@31" url := "http://127.0.0.1:19805" //url := "http://172.17.4.184:19905" username := "es_all" password := "TopJkO2E_d1x" // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } // 构建查询 query := elastic.NewBoolQuery(). Must( elastic.NewScriptQuery(elastic.NewScript(`doc['_id'].value != doc['id'].value`)), elastic.NewRangeQuery("comeintime").Gte(1672416000), ) // 执行Count请求来获取文档总数 countResult, err := client.Count().Index("bidding").Query(query).Do(context.Background()) if err != nil { log.Fatalf("执行Count请求失败:%s", err) } // 获取符合条件的文档总数 total := countResult fmt.Printf("符合条件的文档总数:%d\n", total) // 创建删除请求 deleteService := client.DeleteByQuery().Index("bidding").Query(query) // 执行删除操作 response, err := deleteService.Do(context.Background()) if err != nil { log.Fatalf("执行删除操作失败:%s", err) } // 检查删除操作的结果 if response != nil { fmt.Printf("已删除文档数:%d\n", response.Deleted) } else { fmt.Println("删除操作没有返回结果。") } } type CreditLabel struct { ZhimaToptype string `json:"zhima_toptype"` ZhimaSubtype string `json:"zhima_subtype"` ZhimaName string `json:"zhima_name"` } // getZhiMa 芝麻标签存在 func getZhiMa() { client, err := elastic.NewClient(elastic.SetURL("http://192.168.3.149:9201")) if err != nil { panic(err) } // 查询 zhima_labels 字段存在的数据 query := elastic.NewExistsQuery("zhima_labels") searchResult, err := client.Search(). Index("qyxy"). Query(query). Do(context.Background()) if err != nil { panic(err) } for _, hit := range searchResult.Hits.Hits { var label CreditLabel err := json.Unmarshal(hit.Source, &label) if err != nil { panic(err) } fmt.Println(label) } }