package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "gorm.io/driver/mysql" "gorm.io/gorm" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "sort" "strings" ) func main() { /** getProjectData click 是一起使用的,统计获取中标企业信息 */ //getProjectDataFromEs() //1.拉取项目中标成交数据 //click() //2.处理项目数据,写入clickhouse //click2() //dealData() //getProject() //getQyLimitData() //getBiddingData() //getQyxytData() //getTidb() log.Println("over ------------------ over") } // getBiddingData 获取标讯数据 func getBiddingData() { //url := "http://172.17.4.184:19908" url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "bidding" //索引名称 //index := "projectset" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //85 抽取库 //Mgo := &mongodb.MongodbSim{ // //MongodbAddr: "127.0.0.1:27080", // MongodbAddr: "172.17.4.85:27080", // DbName: "top", // Size: 10, // //Direct: true, //} //Mgo.InitPool() MgoB := &mongodb.MongodbSim{ //MongodbAddr: "172.17.189.140:27080", MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", Direct: true, } MgoB.InitPool() //2023年01-01 2023-10-01,,1-3季度 //2024-1 - 2024-4;1704038400-1711900800 //2023-10-1 2024-1-1;1696089600-1704038400 //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市") //rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1696089600).Lt(1704038400) query := elastic.NewBoolQuery(). Must(elastic.NewTermQuery("toptype", "结果")). Must(elastic.NewTermQuery("subtype", "招标")) //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000") //query := elastic.NewBoolQuery(). // //北京,天津,河北,上海,江苏,浙江,安徽 // //Must(elastic.NewTermQuery("area", "北京市")). // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } delete(doc, "filetext") delete(doc, "detail") //存入新表 err = MgoB.InsertOrUpdate("qfw", "wcc_subtype_err_0429", doc) if err != nil { log.Println("error", doc["id"]) } // 处理查询结果 //area := util.ObjToString(doc["area"]) //areas := []string{"北京", "上海", "广东", "江苏", "浙江"} //if !IsInStringArray(area, areas) { // continue //} //projectName := util.ObjToString(doc["projectname"]) //if strings.Contains(projectName, "非政府") { // continue //} //buyerclass := util.ObjToString(doc["buyerclass"]) //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { // continue //} ////存入新表 //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc) //if err != nil { // log.Println("error", doc["id"]) //} } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } // getProjectDataFromEs 获取项目 中标成交数据 func getProjectDataFromEs() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "projectset" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //85 抽取库 Mgo := &mongodb.MongodbSim{ //MongodbAddr: "127.0.0.1:27080", MongodbAddr: "172.17.4.85:27080", DbName: "top", Size: 10, //Direct: true, } Mgo.InitPool() //MgoB := &mongodb.MongodbSim{ // MongodbAddr: "172.17.189.140:27080", // //MongodbAddr: "127.0.0.1:27083", // Size: 10, // DbName: "qfw", // UserName: "SJZY_RWbid_ES", // Password: "SJZY@B4i4D5e6S", // //Direct: true, //} //MgoB.InitPool() //2023年01-01 2023-10-01,,1-3季度 //2024-1 - 2024-4;1704038400-1711900800 //2023-10-1 2024-1-1;1696089600-1704038400 //areaTermsQuery := elastic.NewTermsQuery("area", "江苏", "安徽", "上海", "天津", "河北", "浙江", "天津市", "上海市", "河北省", "安徽省", "江苏省", "浙江省", "北京", "北京市") rangeQuery := elastic.NewRangeQuery("firsttime").Gte(1711900800).Lt(1719763200) //2024年4-7月 query := elastic.NewBoolQuery(). //Must(areaTermsQuery). Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). Must(rangeQuery) //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000") //query := elastic.NewBoolQuery(). // //北京,天津,河北,上海,江苏,浙江,安徽 // //Must(elastic.NewTermQuery("area", "北京市")). // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } delete(doc, "filetext") delete(doc, "detail") sWinner := util.ObjToString(doc["s_winner"]) winners := strings.Split(sWinner, ",") for _, v := range winners { insert := doc insert["s_winner"] = v //存入新表 err = Mgo.InsertOrUpdate("top", "wcc_allcity_2024Q2", insert) if err != nil { log.Println("error", doc["id"]) } } // 处理查询结果 //area := util.ObjToString(doc["area"]) //areas := []string{"北京", "上海", "广东", "江苏", "浙江"} //if !IsInStringArray(area, areas) { // continue //} //projectName := util.ObjToString(doc["projectname"]) //if strings.Contains(projectName, "非政府") { // continue //} //buyerclass := util.ObjToString(doc["buyerclass"]) //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { // continue //} ////存入新表 //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc) //if err != nil { // log.Println("error", doc["id"]) //} } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } // getQyxytData 获取企业数据 func getQyxytData() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "qyxy" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } //85 抽取库 //Mgo := &mongodb.MongodbSim{ // //MongodbAddr: "127.0.0.1:27080", // MongodbAddr: "172.17.4.85:27080", // DbName: "top", // Size: 10, // //Direct: true, //} //Mgo.InitPool() MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() //2023年01-01 2023-10-01,,1-3季度 //2024-1 - 2024-4;1704038400-1711900800 //2023-10-1 2024-1-1;1696089600-1704038400 //城市范围 //areaTermsQuery := elastic.NewTermsQuery("company_city", "北京市") //rangeQuery := elastic.NewRangeQuery("establish_date").Gte(1704038400) //query := elastic.NewBoolQuery(). // Must(areaTermsQuery). // Must(rangeQuery) //---------------------------// query := elastic.NewBoolQuery() query.Must(elastic.NewMatchQuery("business_scope", "招投标代理")) query.Must(elastic.NewTermQuery("company_city", "北京市")) //rangeQuery := elastic.NewRangeQuery("comeintime").Gte("1640966400").Lt("1703952000") //query := elastic.NewBoolQuery(). // //北京,天津,河北,上海,江苏,浙江,安徽 // //Must(elastic.NewTermQuery("area", "北京市")). // Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")). // Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")). // Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(query). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } if strings.Contains(util.ObjToString(doc["business_scope"]), "招投标代理") { //存入新表 insert := map[string]interface{}{ "company_name": doc["company_name"], "business_scope": doc["business_scope"], "employee_name": doc["employee_name"], "company_phone": doc["company_phone"], } err = MgoB.InsertOrUpdate("qfw", "wcc_2024_beijing_dailijigou", insert) if err != nil { log.Println("error", doc["id"]) } } //sWinner := util.ObjToString(doc["s_winner"]) //winners := strings.Split(sWinner, ",") //for _, v := range winners { // insert := doc // insert["s_winner"] = v // //存入新表 // err = MgoB.InsertOrUpdate("qfw", "wcc_2024_pingdingshan", insert) // if err != nil { // log.Println("error", doc["id"]) // } //} // 处理查询结果 //area := util.ObjToString(doc["area"]) //areas := []string{"北京", "上海", "广东", "江苏", "浙江"} //if !IsInStringArray(area, areas) { // continue //} //projectName := util.ObjToString(doc["projectname"]) //if strings.Contains(projectName, "非政府") { // continue //} //buyerclass := util.ObjToString(doc["buyerclass"]) //if buyerclass == "批发零售" || buyerclass == "住宿餐饮" || buyerclass == "信息技术" { // continue //} ////存入新表 //err = Mgo.InsertOrUpdate("qfw", "wcc_bank_poc", doc) //if err != nil { // log.Println("error", doc["id"]) //} } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } fmt.Println("结束~~~~~~~~~~~~~~~") } // getQyLimitData 获取qyxy 条件数据 func getQyLimitData() { //url := "http://172.17.4.184:19908" url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" index := "qyxy" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } // 构建查询 query := elastic.NewBoolQuery(). Must(elastic.NewMatchQuery("company_area", "河南")). Must(elastic.NewMatchQuery("company_status", "存续")). MustNot(elastic.NewMatchQuery("company_type", "个体工商户")) // 执行查询 searchResult, err := client.Search().Size(50). Index(index). Query(query). Do(context.Background()) if err != nil { log.Fatalf("Error executing search: %s", err) } // 本地数据库 MgoB := &mongodb.MongodbSim{ MongodbAddr: "127.0.0.1:27017", Size: 10, DbName: "wcc", } MgoB.InitPool() for _, hit := range searchResult.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } MgoB.SaveByOriID("wcc_henan_0428", doc) } } // getTidb 获取tidb 数据 func getTidb() { MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() //tidb username := "datascbi" password := "Da#Bi20221111SC" //host := "127.0.0.1:4001" host := "172.17.162.25:4000" database := "global_common_data" dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", username, password, host, database) // 连接到数据库 db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{}) if err != nil { log.Println("Failed to connect to database:", err) return } fmt.Println("Connected to the database!") defer util.Catch() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) it := sess.DB("qfw").C("wcc_2024_beijing_dailijigou").Find(nil).Select(nil).Iter() fmt.Println("taskRun 开始") count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%10000 == 0 { log.Println("current:", count) } companyName := util.ObjToString(tmp["company_name"]) var baseInfo EnterpriseBaseInfo db.Where(&EnterpriseBaseInfo{Name: companyName}).First(&baseInfo) if baseInfo.ID > 0 { insert := map[string]interface{}{ "company_name": companyName, "name_id": baseInfo.NameID, "business_scope": tmp["business_scope"], } MgoB.InsertOrUpdate("qfw", "wcc_beijing_daili_bidding", insert) } } log.Println("over") } // IsInStringArray 判断数组中是否存在字符串 func IsInStringArray(str string, arr []string) bool { // 先对字符串数组进行排序 sort.Strings(arr) // 使用二分查找算法查找字符串 pos := sort.SearchStrings(arr, str) // 如果找到了则返回 true,否则返回 false return pos < len(arr) && arr[pos] == str }