package main import ( "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v7" "github.com/olivere/elastic/v7" "go.mongodb.org/mongo-driver/bson" "io" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "log" "regexp" "strings" "sync" "time" ) // 并发控制参数 const workerCount = 5 type Task struct { ID string CompanyName string } func getCountProjectWinner3() { url := "http://172.17.4.184:19908" username := "jybid" password := "Top2023_JEB01i@31" // 初始化 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) coll := sess.DB("qfw").C("wcc_label_static_0625") it := coll.Find(nil).Select(nil).Iter() log.Println("taskRun 开始") taskCh := make(chan Task, 100) var wg sync.WaitGroup // 启动 worker 协程 for i := 0; i < workerCount; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() ctx := context.Background() for task := range taskCh { func() { defer util.Catch() // 捕获单个任务错误,避免 crash fields := []string{"buyer", "winner"} years := []int{2020, 2021, 2022, 2023, 2024} update := make(map[string]interface{}) for _, role := range fields { for _, year := range years { start := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC).Unix() end := time.Date(year+1, 1, 1, 0, 0, 0, 0, time.UTC).Unix() - 1 query := elastic.NewBoolQuery(). Must(elastic.NewTermQuery(role, task.CompanyName)). Filter(elastic.NewRangeQuery("publishtime").Gte(start).Lte(end)) count, err := client.Count(). Index("bidding"). Query(query). Do(ctx) if err != nil { log.Printf("【Worker %d】 查询失败 [%s - %d]: %v", workerID, role, year, err) continue } key := fmt.Sprintf("%s-%d", role, year) update[key] = count } } MgoB.UpdateById("wcc_label_static_0625", task.ID, bson.M{"$set": update}) }() } }(i) } // 主线程读取 MongoDB 数据发送到 task channel count := 0 for tmp := make(map[string]interface{}); it.Next(&tmp); { count++ if count%1000 == 0 { log.Println("current:", count, tmp["company_name"]) } task := Task{ ID: mongodb.BsonIdToSId(tmp["_id"]), CompanyName: util.ObjToString(tmp["company_name"]), } taskCh <- task } close(taskCh) // 所有任务发完 wg.Wait() log.Println("所有任务处理完成") } func getCountProjectWinner() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" //index := "bidding" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } defer util.Catch() sess := MgoB.GetMgoConn() defer MgoB.DestoryMongoConn(sess) it := sess.DB("qfw").C("wcc_label_static_0625").Find(nil).Select(nil).Iter() log.Println("taskRun 开始") count := 0 ctx := context.Background() for tmp := make(map[string]interface{}); it.Next(&tmp); count++ { if count%1000 == 0 { log.Println("current:", count, tmp["company_name"]) } companyName := util.ObjToString(tmp["company_name"]) //companyName := "上海市特种设备监督检验技术研究院" id := mongodb.BsonIdToSId(tmp["_id"]) fields := []string{"buyer", "winner"} years := []int{2020, 2021, 2022, 2023, 2024} update := make(map[string]interface{}) for _, role := range fields { //fmt.Printf("=== [%s 作为 %s 的数量统计] ===\n", companyName, role) for _, year := range years { // 年份范围(秒) start := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC).Unix() end := time.Date(year+1, 1, 1, 0, 0, 0, 0, time.UTC).Unix() - 1 // 构造查询 query := elastic.NewBoolQuery(). Must(elastic.NewTermQuery(role, companyName)). Filter(elastic.NewRangeQuery("publishtime").Gte(start).Lte(end)) // 查询并只返回总数 count11, err := client.Count(). Index("bidding"). Query(query). Do(ctx) if err != nil { log.Fatalf("查询 [%s-%d] 失败: %v", role, year, err) } ke := fmt.Sprintf("%v-%v", role, year) update[ke] = count11 //fmt.Printf("年份: %d, 数量: %d\n", year, count11) } } MgoB.UpdateById("wcc_label_static_0625", id, map[string]interface{}{"$set": update}) } } func CountProjectWinner2() { // 连接 Elasticsearch cfg := elasticsearch.Config{ Addresses: []string{"http://127.0.0.1:19908"}, // 或者 "http://172.17.4.184:19908" //Addresses: []string{"http://172.17.4.184:19908"}, // 或者 "http://172.17.4.184:19908" Username: "jybid", Password: "Top2023_JEB01i@31", } es, err := elasticsearch.NewClient(cfg) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败: %s", err) } // 构造查询 JSON query := map[string]interface{}{ "track_total_hits": true, // 必须有,确保超过1万条也能拿到真实数量 "size": 100, // 每页条数 "query": map[string]interface{}{ "nested": map[string]interface{}{ "path": "zhima_labels", "query": map[string]interface{}{ "term": map[string]interface{}{ "zhima_labels.zhima_name": "高新技术企业", }, }, }, }, } // 序列化为 JSON var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(query); err != nil { log.Fatalf("Error encoding query: %s", err) } // 执行查询 res, err := es.Search( es.Search.WithContext(context.Background()), es.Search.WithIndex("qyxy"), // 替换为你的索引名 es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), ) if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() // 🔍 检查返回状态码 if res.IsError() { bodyBytes, _ := io.ReadAll(res.Body) log.Fatalf("ES 返回错误: %s\n%s", res.Status(), string(bodyBytes)) } // ✅ 正常解析 body var r map[string]interface{} if err := json.NewDecoder(res.Body).Decode(&r); err != nil { log.Fatalf("解析响应出错: %s", err) } // 打印总命中数 hits := r["hits"].(map[string]interface{}) total := hits["total"].(map[string]interface{})["value"] fmt.Printf("命中总数: %v 条\n", total) // 打印每条结果的 ID 和 _source for _, hit := range hits["hits"].([]interface{}) { doc := hit.(map[string]interface{}) id := doc["_id"] source := doc["_source"] sourceJSON, _ := json.MarshalIndent(source, "", " ") fmt.Printf("ID: %s\nSource: %s\n", id, sourceJSON) } } // CountProjectWinner 统计企业中标项目数量 func CountProjectWinner() { url := "http://172.17.4.184:19908" //url := "http://127.0.0.1:19908" username := "jybid" password := "Top2023_JEB01i@31" //index := "bidding" //索引名称 index := "qyxy" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } labels := `高新技术企业,小巨人企业,国家级技术创新示范企业,众创空间,国家级科技企业孵化器,瞪羚企业,科技型中小企业,制造业单项冠军示范企业,制造业单项冠军产品生产企业,制造业单项冠军培育企业,国家企业技术中心,专精特新企业,省级技术创新示范企业,技术先进型服务企业,省级企业技术中心` label_names := strings.Split(labels, ",") for _, name := range label_names { // 构造 nested 查询:zhima_labels.zhima_name == 高新技术企业 nestedQuery := elastic.NewNestedQuery( "zhima_labels", // path 必须是 nested 字段名本身 elastic.NewBoolQuery().Must( // nested 里的子查询,用 Bool 包一下更稳 elastic.NewTermQuery("zhima_labels.zhima_name", name), ), ) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "10m" searchSource := elastic.NewSearchSource(). Query(nestedQuery). Size(10000). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(10000). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println(name, "总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } //存入新表 insert := map[string]interface{}{ "company_name": doc["company_name"], "id": doc["id"], "label": name, } err = MgoB.InsertOrUpdate("qfw", "wcc_label_static_0625", insert) if err != nil { log.Println("error", doc["id"]) } } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total, name) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Println("滚动搜索失败:", err, res) break // 处理错误时退出循环 } } // 在循环外调用 ClearScroll _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx) if err != nil { log.Printf("清理滚动搜索失败:%s", err) } } } // getProject 获取项目数据 func getProject() { MgoB := &mongodb.MongodbSim{ MongodbAddr: "172.17.189.140:27080", //MongodbAddr: "127.0.0.1:27083", Size: 10, DbName: "qfw", UserName: "SJZY_RWbid_ES", Password: "SJZY@B4i4D5e6S", //Direct: true, } MgoB.InitPool() url := "http://172.17.4.184:19805" //url := "http://127.0.0.1:19805" username := "es_all" password := "TopJkO2E_d1x" index := "projectset" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Fatalf("创建 Elasticsearch 客户端失败:%s", err) } rangeQuery := elastic.NewRangeQuery("pici").Gt("1685548800").Lte("1704038400") //termQ := elastic.NewTermQuery("multipackage", 0) //rangeQuery := elastic.NewRangeQuery("id").Gt("657b08556977356f5578cb25").Lte("657b08556977356f5578cb26") query := elastic.NewBoolQuery().Must(rangeQuery) ctx := context.Background() //开始滚动搜索 scrollID := "" scroll := "1m" searchSource := elastic.NewSearchSource(). Query(query). Size(100). Sort("_doc", true) //升序排序 //Sort("_doc", false) //降序排序 searchService := client.Scroll(index). Size(100). Scroll(scroll). SearchSource(searchSource) res, err := searchService.Do(ctx) if err != nil { if err == io.EOF { fmt.Println("没有数据") } else { panic(err) } } defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源 fmt.Println("总数是:", res.TotalHits()) total := 0 for len(res.Hits.Hits) > 0 { for _, hit := range res.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Printf("解析文档失败:%s", err) continue } //id := util.ObjToString(doc["id"]) //log.Println(id) var matchWords = make([]string, 0) var matchList = make([]interface{}, 0) if list, ok := doc["list"].([]interface{}); ok { for _, v := range list { if da, ok := v.(map[string]interface{}); ok { if util.ObjToString(da["toptype"]) != "招标" { continue } title := util.ObjToString(da["title"]) // 使用正则表达式进行匹配 matches := GetMatches(title) if len(matches) > 0 { matchList = append(matchList, da) } matchWords = append(matchWords, matches...) } } } insert := make(map[string]interface{}) insert["project_id"] = doc["id"] insert["_id"] = doc["id"] insert["multipackage"] = doc["multipackage"] insert["list"] = doc["list"] insert["projectname"] = doc["projectname"] insert["sourceinfourl"] = doc["sourceinfourl"] if len(matchWords) > 0 { insert["matchList"] = matchList insert["package_name"] = util.ObjToString(doc["projectname"]) + "-" + strings.Join(matchWords, "、") insert["type"] = 1 } MgoB.SaveByOriID("wcc_project_20240304", insert) } total = total + len(res.Hits.Hits) scrollID = res.ScrollId res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx) log.Println("current count:", total) if err != nil { if err == io.EOF { // 滚动到最后一批数据,退出循环 break } log.Printf("滚动搜索失败:%s", err) break // 处理错误时退出循环 } } fmt.Println("结束~~~~~~~~~~~~~~~") } func GetMatches(title string) (res []string) { // 编译正则表达式 re := regexp.MustCompile(`包\d+`) // 标题只有一个包2 // 定义正则表达式 re2 := regexp.MustCompile(`包(\d+(?:、\d+)*)`) //标题含有多个包;包10、12、14、17、18、19 re3 := regexp.MustCompile(`\d+包`) //冀中股份2023年12月标准件计划框架协议采购2包12.7 re4 := regexp.MustCompile(`标包\d+`) //2024一季度水火电一般工序类中小金工件(标包1)-招标公告 //re4 := regexp.MustCompile(`\d+标段`) //text := "中国绿发投资集团有限公司直属项目公司2023年第20批集中采购非招标项目(包10、12、14、17、18、19" //matches := re3.FindAllString(util.ObjToString(text), -1) matches := re2.FindAllString(util.ObjToString(title), -1) if len(matches) > 0 { return matches } matches = re4.FindAllString(util.ObjToString(title), -1) if len(matches) > 0 { return matches } matches = re3.FindAllString(util.ObjToString(title), -1) if len(matches) > 0 { return matches } matches = re.FindAllString(util.ObjToString(title), -1) return matches }