|
@@ -1,18 +1,354 @@
|
|
|
package main
|
|
|
|
|
|
import (
|
|
|
+ "bytes"
|
|
|
"context"
|
|
|
"encoding/json"
|
|
|
"fmt"
|
|
|
+ "github.com/elastic/go-elasticsearch/v7"
|
|
|
"github.com/olivere/elastic/v7"
|
|
|
+ "go.mongodb.org/mongo-driver/bson"
|
|
|
"io"
|
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
"log"
|
|
|
"regexp"
|
|
|
"strings"
|
|
|
+ "sync"
|
|
|
+ "time"
|
|
|
)
|
|
|
|
|
|
+// 并发控制参数
|
|
|
+const workerCount = 5
|
|
|
+
|
|
|
+type Task struct {
|
|
|
+ ID string
|
|
|
+ CompanyName string
|
|
|
+}
|
|
|
+
|
|
|
+func getCountProjectWinner3() {
|
|
|
+ url := "http://172.17.4.184:19908"
|
|
|
+ username := "jybid"
|
|
|
+ password := "Top2023_JEB01i@31"
|
|
|
+
|
|
|
+ // 初始化 Elasticsearch 客户端
|
|
|
+ client, err := elastic.NewClient(
|
|
|
+ elastic.SetURL(url),
|
|
|
+ elastic.SetBasicAuth(username, password),
|
|
|
+ elastic.SetSniff(false),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ sess := MgoB.GetMgoConn()
|
|
|
+ defer MgoB.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ coll := sess.DB("qfw").C("wcc_label_static_0625")
|
|
|
+ it := coll.Find(nil).Select(nil).Iter()
|
|
|
+
|
|
|
+ log.Println("taskRun 开始")
|
|
|
+ taskCh := make(chan Task, 100)
|
|
|
+ var wg sync.WaitGroup
|
|
|
+
|
|
|
+ // 启动 worker 协程
|
|
|
+ for i := 0; i < workerCount; i++ {
|
|
|
+ wg.Add(1)
|
|
|
+ go func(workerID int) {
|
|
|
+ defer wg.Done()
|
|
|
+ ctx := context.Background()
|
|
|
+ for task := range taskCh {
|
|
|
+ func() {
|
|
|
+ defer util.Catch() // 捕获单个任务错误,避免 crash
|
|
|
+
|
|
|
+ fields := []string{"buyer", "winner"}
|
|
|
+ years := []int{2020, 2021, 2022, 2023, 2024}
|
|
|
+ update := make(map[string]interface{})
|
|
|
+
|
|
|
+ for _, role := range fields {
|
|
|
+ for _, year := range years {
|
|
|
+ start := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
|
|
|
+ end := time.Date(year+1, 1, 1, 0, 0, 0, 0, time.UTC).Unix() - 1
|
|
|
+
|
|
|
+ query := elastic.NewBoolQuery().
|
|
|
+ Must(elastic.NewTermQuery(role, task.CompanyName)).
|
|
|
+ Filter(elastic.NewRangeQuery("publishtime").Gte(start).Lte(end))
|
|
|
+
|
|
|
+ count, err := client.Count().
|
|
|
+ Index("bidding").
|
|
|
+ Query(query).
|
|
|
+ Do(ctx)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("【Worker %d】 查询失败 [%s - %d]: %v", workerID, role, year, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ key := fmt.Sprintf("%s-%d", role, year)
|
|
|
+ update[key] = count
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ MgoB.UpdateById("wcc_label_static_0625", task.ID, bson.M{"$set": update})
|
|
|
+
|
|
|
+ }()
|
|
|
+ }
|
|
|
+ }(i)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 主线程读取 MongoDB 数据发送到 task channel
|
|
|
+ count := 0
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); {
|
|
|
+ count++
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Println("current:", count, tmp["company_name"])
|
|
|
+ }
|
|
|
+ task := Task{
|
|
|
+ ID: mongodb.BsonIdToSId(tmp["_id"]),
|
|
|
+ CompanyName: util.ObjToString(tmp["company_name"]),
|
|
|
+ }
|
|
|
+ taskCh <- task
|
|
|
+ }
|
|
|
+ close(taskCh) // 所有任务发完
|
|
|
+
|
|
|
+ wg.Wait()
|
|
|
+ log.Println("所有任务处理完成")
|
|
|
+}
|
|
|
+
|
|
|
+func getCountProjectWinner() {
|
|
|
+ url := "http://172.17.4.184:19908"
|
|
|
+ //url := "http://127.0.0.1:19908"
|
|
|
+ username := "jybid"
|
|
|
+ password := "Top2023_JEB01i@31"
|
|
|
+ //index := "bidding" //索引名称
|
|
|
+ // 创建 Elasticsearch 客户端
|
|
|
+ client, err := elastic.NewClient(
|
|
|
+ elastic.SetURL(url),
|
|
|
+ elastic.SetBasicAuth(username, password),
|
|
|
+ elastic.SetSniff(false),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
|
|
|
+ }
|
|
|
+ defer util.Catch()
|
|
|
+ sess := MgoB.GetMgoConn()
|
|
|
+ defer MgoB.DestoryMongoConn(sess)
|
|
|
+ it := sess.DB("qfw").C("wcc_label_static_0625").Find(nil).Select(nil).Iter()
|
|
|
+ log.Println("taskRun 开始")
|
|
|
+ count := 0
|
|
|
+ ctx := context.Background()
|
|
|
+ for tmp := make(map[string]interface{}); it.Next(&tmp); count++ {
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Println("current:", count, tmp["company_name"])
|
|
|
+ }
|
|
|
+ companyName := util.ObjToString(tmp["company_name"])
|
|
|
+ //companyName := "上海市特种设备监督检验技术研究院"
|
|
|
+
|
|
|
+ id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ fields := []string{"buyer", "winner"}
|
|
|
+ years := []int{2020, 2021, 2022, 2023, 2024}
|
|
|
+
|
|
|
+ update := make(map[string]interface{})
|
|
|
+ for _, role := range fields {
|
|
|
+ //fmt.Printf("=== [%s 作为 %s 的数量统计] ===\n", companyName, role)
|
|
|
+ for _, year := range years {
|
|
|
+ // 年份范围(秒)
|
|
|
+ start := time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC).Unix()
|
|
|
+ end := time.Date(year+1, 1, 1, 0, 0, 0, 0, time.UTC).Unix() - 1
|
|
|
+
|
|
|
+ // 构造查询
|
|
|
+ query := elastic.NewBoolQuery().
|
|
|
+ Must(elastic.NewTermQuery(role, companyName)).
|
|
|
+ Filter(elastic.NewRangeQuery("publishtime").Gte(start).Lte(end))
|
|
|
+
|
|
|
+ // 查询并只返回总数
|
|
|
+ count11, err := client.Count().
|
|
|
+ Index("bidding").
|
|
|
+ Query(query).
|
|
|
+ Do(ctx)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("查询 [%s-%d] 失败: %v", role, year, err)
|
|
|
+ }
|
|
|
+
|
|
|
+ ke := fmt.Sprintf("%v-%v", role, year)
|
|
|
+ update[ke] = count11
|
|
|
+ //fmt.Printf("年份: %d, 数量: %d\n", year, count11)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ MgoB.UpdateById("wcc_label_static_0625", id, map[string]interface{}{"$set": update})
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func CountProjectWinner2() {
|
|
|
+ // 连接 Elasticsearch
|
|
|
+ cfg := elasticsearch.Config{
|
|
|
+ Addresses: []string{"http://127.0.0.1:19908"}, // 或者 "http://172.17.4.184:19908"
|
|
|
+ //Addresses: []string{"http://172.17.4.184:19908"}, // 或者 "http://172.17.4.184:19908"
|
|
|
+ Username: "jybid",
|
|
|
+ Password: "Top2023_JEB01i@31",
|
|
|
+ }
|
|
|
+ es, err := elasticsearch.NewClient(cfg)
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("创建 Elasticsearch 客户端失败: %s", err)
|
|
|
+ }
|
|
|
+ // 构造查询 JSON
|
|
|
+ query := map[string]interface{}{
|
|
|
+ "track_total_hits": true, // 必须有,确保超过1万条也能拿到真实数量
|
|
|
+ "size": 100, // 每页条数
|
|
|
+ "query": map[string]interface{}{
|
|
|
+ "nested": map[string]interface{}{
|
|
|
+ "path": "zhima_labels",
|
|
|
+ "query": map[string]interface{}{
|
|
|
+ "term": map[string]interface{}{
|
|
|
+ "zhima_labels.zhima_name": "高新技术企业",
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ },
|
|
|
+ }
|
|
|
+
|
|
|
+ // 序列化为 JSON
|
|
|
+ var buf bytes.Buffer
|
|
|
+ if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
|
|
+ log.Fatalf("Error encoding query: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 执行查询
|
|
|
+ res, err := es.Search(
|
|
|
+ es.Search.WithContext(context.Background()),
|
|
|
+ es.Search.WithIndex("qyxy"), // 替换为你的索引名
|
|
|
+ es.Search.WithBody(&buf),
|
|
|
+ es.Search.WithTrackTotalHits(true),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("Error getting response: %s", err)
|
|
|
+ }
|
|
|
+ defer res.Body.Close()
|
|
|
+ // 🔍 检查返回状态码
|
|
|
+ if res.IsError() {
|
|
|
+ bodyBytes, _ := io.ReadAll(res.Body)
|
|
|
+ log.Fatalf("ES 返回错误: %s\n%s", res.Status(), string(bodyBytes))
|
|
|
+ }
|
|
|
+
|
|
|
+ // ✅ 正常解析 body
|
|
|
+ var r map[string]interface{}
|
|
|
+ if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
|
|
|
+ log.Fatalf("解析响应出错: %s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 打印总命中数
|
|
|
+ hits := r["hits"].(map[string]interface{})
|
|
|
+ total := hits["total"].(map[string]interface{})["value"]
|
|
|
+ fmt.Printf("命中总数: %v 条\n", total)
|
|
|
+
|
|
|
+ // 打印每条结果的 ID 和 _source
|
|
|
+ for _, hit := range hits["hits"].([]interface{}) {
|
|
|
+ doc := hit.(map[string]interface{})
|
|
|
+ id := doc["_id"]
|
|
|
+ source := doc["_source"]
|
|
|
+ sourceJSON, _ := json.MarshalIndent(source, "", " ")
|
|
|
+ fmt.Printf("ID: %s\nSource: %s\n", id, sourceJSON)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// CountProjectWinner 统计企业中标项目数量
|
|
|
+func CountProjectWinner() {
|
|
|
+ url := "http://172.17.4.184:19908"
|
|
|
+ //url := "http://127.0.0.1:19908"
|
|
|
+ username := "jybid"
|
|
|
+ password := "Top2023_JEB01i@31"
|
|
|
+ //index := "bidding" //索引名称
|
|
|
+ index := "qyxy" //索引名称
|
|
|
+ // 创建 Elasticsearch 客户端
|
|
|
+ client, err := elastic.NewClient(
|
|
|
+ elastic.SetURL(url),
|
|
|
+ elastic.SetBasicAuth(username, password),
|
|
|
+ elastic.SetSniff(false),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ labels := `高新技术企业,小巨人企业,国家级技术创新示范企业,众创空间,国家级科技企业孵化器,瞪羚企业,科技型中小企业,制造业单项冠军示范企业,制造业单项冠军产品生产企业,制造业单项冠军培育企业,国家企业技术中心,专精特新企业,省级技术创新示范企业,技术先进型服务企业,省级企业技术中心`
|
|
|
+ label_names := strings.Split(labels, ",")
|
|
|
+ for _, name := range label_names {
|
|
|
+ // 构造 nested 查询:zhima_labels.zhima_name == 高新技术企业
|
|
|
+ nestedQuery := elastic.NewNestedQuery(
|
|
|
+ "zhima_labels", // path 必须是 nested 字段名本身
|
|
|
+ elastic.NewBoolQuery().Must( // nested 里的子查询,用 Bool 包一下更稳
|
|
|
+ elastic.NewTermQuery("zhima_labels.zhima_name", name),
|
|
|
+ ),
|
|
|
+ )
|
|
|
+
|
|
|
+ ctx := context.Background()
|
|
|
+ //开始滚动搜索
|
|
|
+ scrollID := ""
|
|
|
+ scroll := "10m"
|
|
|
+ searchSource := elastic.NewSearchSource().
|
|
|
+ Query(nestedQuery).
|
|
|
+ Size(10000).
|
|
|
+ Sort("_doc", true) //升序排序
|
|
|
+ //Sort("_doc", false) //降序排序
|
|
|
+
|
|
|
+ searchService := client.Scroll(index).
|
|
|
+ Size(10000).
|
|
|
+ Scroll(scroll).
|
|
|
+ SearchSource(searchSource)
|
|
|
+
|
|
|
+ res, err := searchService.Do(ctx)
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ fmt.Println("没有数据")
|
|
|
+ } else {
|
|
|
+ panic(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ //defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
|
|
|
+ fmt.Println(name, "总数是:", res.TotalHits())
|
|
|
+ total := 0
|
|
|
+ for len(res.Hits.Hits) > 0 {
|
|
|
+ for _, hit := range res.Hits.Hits {
|
|
|
+ var doc map[string]interface{}
|
|
|
+ err := json.Unmarshal(hit.Source, &doc)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("解析文档失败:%s", err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ //存入新表
|
|
|
+ insert := map[string]interface{}{
|
|
|
+ "company_name": doc["company_name"],
|
|
|
+ "id": doc["id"],
|
|
|
+ "label": name,
|
|
|
+ }
|
|
|
+
|
|
|
+ err = MgoB.InsertOrUpdate("qfw", "wcc_label_static_0625", insert)
|
|
|
+ if err != nil {
|
|
|
+ log.Println("error", doc["id"])
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ total = total + len(res.Hits.Hits)
|
|
|
+ scrollID = res.ScrollId
|
|
|
+ res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
|
|
|
+ log.Println("current count:", total, name)
|
|
|
+ if err != nil {
|
|
|
+ if err == io.EOF {
|
|
|
+ // 滚动到最后一批数据,退出循环
|
|
|
+ break
|
|
|
+ }
|
|
|
+ log.Println("滚动搜索失败:", err, res)
|
|
|
+ break // 处理错误时退出循环
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 在循环外调用 ClearScroll
|
|
|
+ _, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
|
|
|
+ if err != nil {
|
|
|
+ log.Printf("清理滚动搜索失败:%s", err)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
// getProject 获取项目数据
|
|
|
func getProject() {
|
|
|
MgoB := &mongodb.MongodbSim{
|