package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "sort" "strings" ) func main() { //getProjectData() //从es 拉取数据 //click() //写入clickhouse //click2() //dealData() //getProject() //getQyLimitData() getBiddingData() log.Println("over ------------------ over") } // getBiddingData 获取标讯数据 func getBiddingData() { //url := "http://172.17.4.184:19908" url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "bidding" //索引名称 //index := "projectset" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //85 抽取库 //Mgo := &mongodb.MongodbSim{ // //MongodbAddr: "127.0.0.1:27080", // MongodbAddr: "172.17.4.85:27080", // DbName: "top", // Size: 10, // //Direct: true, //} //Mgo.InitPool() MgoB := &mongodb.MongodbSim{ //MongodbAddr: "172.17.189.140:27080", MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", Direct: true, } MgoB.InitPool() //2023年01-01 2023-10-01,,1-3季度 //2024-1 - 2024-4;1704038400-1711900800 //2023-10-1 2024-1-1;1696089600-1704038400 //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市") //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400) query := elastic.NewBoolQuery(). Must(elastic.NewTermQuery("toptype", "结果")). Must(elastic.NewTermQuery("subtype", "招标")) //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000") //query := elastic.NewBoolQuery(). // //北京,天津,河北,上海,江苏,浙江,安徽 // //Must(elastic.NewTermQuery("area", "北京市")). // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } delete(doc, "filetext") delete(doc, "detail") //存入新表 err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc) if err != nil { log.Println("error", doc["id"]) } // 处理查询结果 //area := util.ObjToString(doc["area"]) //areas := []string{"北京", "上海", "广东", "江苏", "浙江"} //if !IsInStringArray(area, areas) { // continue //} //projectName := util.ObjToString(doc["projectname"]) //if strings.Contains(projectName, "非政府") { // continue //} //buyerclass := util.ObjToString(doc["buyerclass"]) //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { // continue //} ////存入新表 //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc) //if err != nil { // log.Println("error", doc["id"]) //} } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } // getData 处理北京 京津翼数据 投标相关数据 func getProjectData() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "qyxy" //索引名称 //index := "projectset" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //85 抽取库 Mgo := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "172.17.4.85:27080", DbName: "top", Size: 10, //Direct: true, } Mgo.InitPool() //MgoB := &mongodb.MongodbSim{ // MongodbAddr: "172.17.189.140:27080", // //MongodbAddr: "127.0.0.1:27083", // Size: 10, // DbName: "qfw", // UserName: "SJZY_RWbid_ES", // Password: "SJZY@B4i4D5e6S", // //Direct: true, //} //MgoB.InitPool() //2023年01-01 2023-10-01,,1-3季度 //2024-1 - 2024-4;1704038400-1711900800 //2023-10-1 2024-1-1;1696089600-1704038400 areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市") rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400) query := elastic.NewBoolQuery(). Must(areaTermsQuery). Must(rangeQuery) //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000") //query := elastic.NewBoolQuery(). // //北京,天津,河北,上海,江苏,浙江,安徽 // //Must(elastic.NewTermQuery("area", "北京市")). // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } delete(doc, "filetext") delete(doc, "detail") sWinner := util.ObjToString(doc["s_winner"]) winners := strings.Split(sWinner, ",") for _, v := range winners { insert := doc insert["s_winner"] = v //存入新表 err = Mgo.InsertOrUpdate("top", "wcc_allcity_2024Q1", insert) if err != nil { log.Println("error", doc["id"]) } } // 处理查询结果 //area := util.ObjToString(doc["area"]) //areas := []string{"北京", "上海", "广东", "江苏", "浙江"} //if !IsInStringArray(area, areas) { // continue //} //projectName := util.ObjToString(doc["projectname"]) //if strings.Contains(projectName, "非政府") { // continue //} //buyerclass := util.ObjToString(doc["buyerclass"]) //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { // continue //} ////存入新表 //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc) //if err != nil { // log.Println("error", doc["id"]) //} } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } // getQyxytData 获取企业数据 func getQyxytData() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "qyxy" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //85 抽取库 //Mgo := &mongodb.MongodbSim{ // //MongodbAddr: "127.0.0.1:27080", // MongodbAddr: "172.17.4.85:27080", // DbName: "top", // Size: 10, // //Direct: true, //} //Mgo.InitPool() MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() //2023年01-01 2023-10-01,,1-3季度 //2024-1 - 2024-4;1704038400-1711900800 //2023-10-1 2024-1-1;1696089600-1704038400 //城市范围 areaTermsQuery := elastic.NewTermsQuery("company_city", "平顶山市") rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400) query := elastic.NewBoolQuery(). Must(areaTermsQuery). Must(rangeQuery) //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000") //query := elastic.NewBoolQuery(). // //北京,天津,河北,上海,江苏,浙江,安徽 // //Must(elastic.NewTermQuery("area", "北京市")). // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } //存入新表 err = MgoB.InsertOrUpdate("qfw", "wcc_2024_pingdingshan", doc) if err != nil { log.Println("error", doc["id"]) } //sWinner := util.ObjToString(doc["s_winner"]) //winners := strings.Split(sWinner, ",") //for _, v := range winners { // insert := doc // insert["s_winner"] = v // //存入新表 // err = MgoB.InsertOrUpdate("qfw", "wcc_2024_pingdingshan", insert) // if err != nil { // log.Println("error", doc["id"]) // } //} // 处理查询结果 //area := util.ObjToString(doc["area"]) //areas := []string{"北京", "上海", "广东", "江苏", "浙江"} //if !IsInStringArray(area, areas) { // continue //} //projectName := util.ObjToString(doc["projectname"]) //if strings.Contains(projectName, "非政府") { // continue //} //buyerclass := util.ObjToString(doc["buyerclass"]) //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { // continue //} ////存入新表 //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc) //if err != nil { // log.Println("error", doc["id"]) //} } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } // getQyLimitData 获取qyxy 条件数据 func getQyLimitData() { //url := "http://172.17.4.184:19908" url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "qyxy" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } // 构建查询 query := elastic.NewBoolQuery(). Must(elastic.NewMatchQuery("company_area", "河南")). Must(elastic.NewMatchQuery("company_status", "存续")). MustNot(elastic.NewMatchQuery("company_type", "个体工商户")) // 执行查询 searchResult, err := client.Search().Size(50). Index(index). Query(query). Do(context.Background()) if err != nil { log.Fatalf("Error executing search: %s", err) } // 本地数据库 MgoB := &mongodb.MongodbSim{ MongodbAddr: "127.0.0.1:27017", Size: 10, DbName: "wcc", } MgoB.InitPool() for _, hit := range searchResult.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } MgoB.SaveByOriID("wcc_henan_0428", doc) } } // IsInStringArray 判断数组中是否存在字符串 func IsInStringArray(str string, arr []string) bool { // 先对字符串数组进行排序 sort.Strings(arr) // 使用二分查找算法查找字符串 pos := sort.SearchStrings(arr, str) // 如果找到了则返回 true,否则返回 false return pos < len(arr) && arr[pos] == str }