|
- package main
- import (
- "bytes"
- "context"
- "encoding/json"
- "fmt"
- "github.com/elastic/go-elasticsearch/v7"
- "github.com/olivere/elastic/v7"
- "go.mongodb.org/mongo-driver/bson"
- "io"
- util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
- "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
- "log"
- "regexp"
- "strings"
- "sync"
- "time"
- )
- // 并发控制参数
- const workerCount = 5
- type Task struct {
- ID string
- CompanyName string
- }
- func getCountProjectWinner3() {
- url := "http://172.17.4.184:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- // 初始化 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- coll := sess.DB("qfw").C("wcc_label_static_0625")
- it := coll.Find(nil).Select(nil).Iter()
- log.Println("taskRun 开始")
- taskCh := make(chan Task, 100)
- var wg sync.WaitGroup
- // 启动 worker 协程
- for i := 0; i < workerCount; i++ {
- wg.Add(1)
- go func(workerID int) {
- defer wg.Done()
- ctx := context.Background()
- for task := range taskCh {
- func() {
- defer util.Catch() // 捕获单个任务错误,避免 crash
- fields := []string{"buyer", "winner"}
- years := []int{2020, 2021, 2022, 2023, 2024}
- update := make(map[string]interface{})
- for _, role := range fields {
- for _, year := range years {
- start := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
- end := time.Date(year+1, 1, 1, 0, 0, 0, 0, time.UTC).Unix() - 1
- query := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery(role, task.CompanyName)).
- Filter(elastic.NewRangeQuery("publishtime").Gte(start).Lte(end))
- count, err := client.Count().
- Index("bidding").
- Query(query).
- Do(ctx)
- if err != nil {
- log.Printf("【Worker %d】 查询失败 [%s - %d]: %v", workerID, role, year, err)
- continue
- }
- key := fmt.Sprintf("%s-%d", role, year)
- update[key] = count
- }
- }
- MgoB.UpdateById("wcc_label_static_0625", task.ID, bson.M{"$set": update})
- }()
- }
- }(i)
- }
- // 主线程读取 MongoDB 数据发送到 task channel
- count := 0
- for tmp := make(map[string]interface{}); it.Next(&tmp); {
- count++
- if count%1000 == 0 {
- log.Println("current:", count, tmp["company_name"])
- }
- task := Task{
- ID: mongodb.BsonIdToSId(tmp["_id"]),
- CompanyName: util.ObjToString(tmp["company_name"]),
- }
- taskCh <- task
- }
- close(taskCh) // 所有任务发完
- wg.Wait()
- log.Println("所有任务处理完成")
- }
- func getCountProjectWinner() {
- url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- //index := "bidding" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- defer util.Catch()
- sess := MgoB.GetMgoConn()
- defer MgoB.DestoryMongoConn(sess)
- it := sess.DB("qfw").C("wcc_label_static_0625").Find(nil).Select(nil).Iter()
- log.Println("taskRun 开始")
- count := 0
- ctx := context.Background()
- for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
- if count%1000 == 0 {
- log.Println("current:", count, tmp["company_name"])
- }
- companyName := util.ObjToString(tmp["company_name"])
- //companyName := "上海市特种设备监督检验技术研究院"
- id := mongodb.BsonIdToSId(tmp["_id"])
- fields := []string{"buyer", "winner"}
- years := []int{2020, 2021, 2022, 2023, 2024}
- update := make(map[string]interface{})
- for _, role := range fields {
- //fmt.Printf("=== [%s 作为 %s 的数量统计] ===\n", companyName, role)
- for _, year := range years {
- // 年份范围(秒)
- start := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
- end := time.Date(year+1, 1, 1, 0, 0, 0, 0, time.UTC).Unix() - 1
- // 构造查询
- query := elastic.NewBoolQuery().
- Must(elastic.NewTermQuery(role, companyName)).
- Filter(elastic.NewRangeQuery("publishtime").Gte(start).Lte(end))
- // 查询并只返回总数
- count11, err := client.Count().
- Index("bidding").
- Query(query).
- Do(ctx)
- if err != nil {
- log.Fatalf("查询 [%s-%d] 失败: %v", role, year, err)
- }
- ke := fmt.Sprintf("%v-%v", role, year)
- update[ke] = count11
- //fmt.Printf("年份: %d, 数量: %d\n", year, count11)
- }
- }
- MgoB.UpdateById("wcc_label_static_0625", id, map[string]interface{}{"$set": update})
- }
- }
- func CountProjectWinner2() {
- // 连接 Elasticsearch
- cfg := elasticsearch.Config{
- Addresses: []string{"http://127.0.0.1:19908"}, // 或者 "http://172.17.4.184:19908"
- //Addresses: []string{"http://172.17.4.184:19908"}, // 或者 "http://172.17.4.184:19908"
- Username: "jybid",
- Password: "Top2023_JEB01i@31",
- }
- es, err := elasticsearch.NewClient(cfg)
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败: %s", err)
- }
- // 构造查询 JSON
- query := map[string]interface{}{
- "track_total_hits": true, // 必须有,确保超过1万条也能拿到真实数量
- "size": 100, // 每页条数
- "query": map[string]interface{}{
- "nested": map[string]interface{}{
- "path": "zhima_labels",
- "query": map[string]interface{}{
- "term": map[string]interface{}{
- "zhima_labels.zhima_name": "高新技术企业",
- },
- },
- },
- },
- }
- // 序列化为 JSON
- var buf bytes.Buffer
- if err := json.NewEncoder(&buf).Encode(query); err != nil {
- log.Fatalf("Error encoding query: %s", err)
- }
- // 执行查询
- res, err := es.Search(
- es.Search.WithContext(context.Background()),
- es.Search.WithIndex("qyxy"), // 替换为你的索引名
- es.Search.WithBody(&buf),
- es.Search.WithTrackTotalHits(true),
- )
- if err != nil {
- log.Fatalf("Error getting response: %s", err)
- }
- defer res.Body.Close()
- // 🔍 检查返回状态码
- if res.IsError() {
- bodyBytes, _ := io.ReadAll(res.Body)
- log.Fatalf("ES 返回错误: %s\n%s", res.Status(), string(bodyBytes))
- }
- // ✅ 正常解析 body
- var r map[string]interface{}
- if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
- log.Fatalf("解析响应出错: %s", err)
- }
- // 打印总命中数
- hits := r["hits"].(map[string]interface{})
- total := hits["total"].(map[string]interface{})["value"]
- fmt.Printf("命中总数: %v 条\n", total)
- // 打印每条结果的 ID 和 _source
- for _, hit := range hits["hits"].([]interface{}) {
- doc := hit.(map[string]interface{})
- id := doc["_id"]
- source := doc["_source"]
- sourceJSON, _ := json.MarshalIndent(source, "", " ")
- fmt.Printf("ID: %s\nSource: %s\n", id, sourceJSON)
- }
- }
- // CountProjectWinner 统计企业中标项目数量
- func CountProjectWinner() {
- url := "http://172.17.4.184:19908"
- //url := "http://127.0.0.1:19908"
- username := "jybid"
- password := "Top2023_JEB01i@31"
- //index := "bidding" //索引名称
- index := "qyxy" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- labels := `高新技术企业,小巨人企业,国家级技术创新示范企业,众创空间,国家级科技企业孵化器,瞪羚企业,科技型中小企业,制造业单项冠军示范企业,制造业单项冠军产品生产企业,制造业单项冠军培育企业,国家企业技术中心,专精特新企业,省级技术创新示范企业,技术先进型服务企业,省级企业技术中心`
- label_names := strings.Split(labels, ",")
- for _, name := range label_names {
- // 构造 nested 查询:zhima_labels.zhima_name == 高新技术企业
- nestedQuery := elastic.NewNestedQuery(
- "zhima_labels", // path 必须是 nested 字段名本身
- elastic.NewBoolQuery().Must( // nested 里的子查询,用 Bool 包一下更稳
- elastic.NewTermQuery("zhima_labels.zhima_name", name),
- ),
- )
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "10m"
- searchSource := elastic.NewSearchSource().
- Query(nestedQuery).
- Size(10000).
- Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll(index).
- Size(10000).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println(name, "总数是:", res.TotalHits())
- total := 0
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- //存入新表
- insert := map[string]interface{}{
- "company_name": doc["company_name"],
- "id": doc["id"],
- "label": name,
- }
- err = MgoB.InsertOrUpdate("qfw", "wcc_label_static_0625", insert)
- if err != nil {
- log.Println("error", doc["id"])
- }
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- log.Println("current count:", total, name)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Println("滚动搜索失败:", err, res)
- break // 处理错误时退出循环
- }
- }
- // 在循环外调用 ClearScroll
- _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
- if err != nil {
- log.Printf("清理滚动搜索失败:%s", err)
- }
- }
- }
- // getProject 获取项目数据
- func getProject() {
- MgoB := &mongodb.MongodbSim{
- MongodbAddr: "172.17.189.140:27080",
- //MongodbAddr: "127.0.0.1:27083",
- Size: 10,
- DbName: "qfw",
- UserName: "SJZY_RWbid_ES",
- Password: "SJZY@B4i4D5e6S",
- //Direct: true,
- }
- MgoB.InitPool()
- url := "http://172.17.4.184:19805"
- //url := "http://127.0.0.1:19805"
- username := "es_all"
- password := "TopJkO2E_d1x"
- index := "projectset" //索引名称
- // 创建 Elasticsearch 客户端
- client, err := elastic.NewClient(
- elastic.SetURL(url),
- elastic.SetBasicAuth(username, password),
- elastic.SetSniff(false),
- )
- if err != nil {
- log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
- }
- rangeQuery := elastic.NewRangeQuery("pici").Gt("1685548800").Lte("1704038400")
- //termQ := elastic.NewTermQuery("multipackage", 0)
- //rangeQuery := elastic.NewRangeQuery("id").Gt("657b08556977356f5578cb25").Lte("657b08556977356f5578cb26")
- query := elastic.NewBoolQuery().Must(rangeQuery)
- ctx := context.Background()
- //开始滚动搜索
- scrollID := ""
- scroll := "1m"
- searchSource := elastic.NewSearchSource().
- Query(query).
- Size(100).
- Sort("_doc", true) //升序排序
- //Sort("_doc", false) //降序排序
- searchService := client.Scroll(index).
- Size(100).
- Scroll(scroll).
- SearchSource(searchSource)
- res, err := searchService.Do(ctx)
- if err != nil {
- if err == io.EOF {
- fmt.Println("没有数据")
- } else {
- panic(err)
- }
- }
- defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
- fmt.Println("总数是:", res.TotalHits())
- total := 0
- for len(res.Hits.Hits) > 0 {
- for _, hit := range res.Hits.Hits {
- var doc map[string]interface{}
- err := json.Unmarshal(hit.Source, &doc)
- if err != nil {
- log.Printf("解析文档失败:%s", err)
- continue
- }
- //id := util.ObjToString(doc["id"])
- //log.Println(id)
- var matchWords = make([]string, 0)
- var matchList = make([]interface{}, 0)
- if list, ok := doc["list"].([]interface{}); ok {
- for _, v := range list {
- if da, ok := v.(map[string]interface{}); ok {
- if util.ObjToString(da["toptype"]) != "招标" {
- continue
- }
- title := util.ObjToString(da["title"])
- // 使用正则表达式进行匹配
- matches := GetMatches(title)
- if len(matches) > 0 {
- matchList = append(matchList, da)
- }
- matchWords = append(matchWords, matches...)
- }
- }
- }
- insert := make(map[string]interface{})
- insert["project_id"] = doc["id"]
- insert["_id"] = doc["id"]
- insert["multipackage"] = doc["multipackage"]
- insert["list"] = doc["list"]
- insert["projectname"] = doc["projectname"]
- insert["sourceinfourl"] = doc["sourceinfourl"]
- if len(matchWords) > 0 {
- insert["matchList"] = matchList
- insert["package_name"] = util.ObjToString(doc["projectname"]) + "-" + strings.Join(matchWords, "、")
- insert["type"] = 1
- }
- MgoB.SaveByOriID("wcc_project_20240304", insert)
- }
- total = total + len(res.Hits.Hits)
- scrollID = res.ScrollId
- res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
- log.Println("current count:", total)
- if err != nil {
- if err == io.EOF {
- // 滚动到最后一批数据,退出循环
- break
- }
- log.Printf("滚动搜索失败:%s", err)
- break // 处理错误时退出循环
- }
- }
- fmt.Println("结束~~~~~~~~~~~~~~~")
- }
- func GetMatches(title string) (res []string) {
- // 编译正则表达式
- re := regexp.MustCompile(`包\d+`) // 标题只有一个包2
- // 定义正则表达式
- re2 := regexp.MustCompile(`包(\d+(?:、\d+)*)`) //标题含有多个包;包10、12、14、17、18、19
- re3 := regexp.MustCompile(`\d+包`) //冀中股份2023年12月标准件计划框架协议采购2包12.7
- re4 := regexp.MustCompile(`标包\d+`) //2024一季度水火电一般工序类中小金工件(标包1)-招标公告
- //re4 := regexp.MustCompile(`\d+标段`)
- //text := "中国绿发投资集团有限公司直属项目公司2023年第20批集中采购非招标项目(包10、12、14、17、18、19"
- //matches := re3.FindAllString(util.ObjToString(text), -1)
- matches := re2.FindAllString(util.ObjToString(title), -1)
- if len(matches) > 0 {
- return matches
- }
- matches = re4.FindAllString(util.ObjToString(title), -1)
- if len(matches) > 0 {
- return matches
- }
- matches = re3.FindAllString(util.ObjToString(title), -1)
- if len(matches) > 0 {
- return matches
- }
- matches = re.FindAllString(util.ObjToString(title), -1)
- return matches
- }
|