wcc 8 小时之前
父节点
当前提交
5f5787d13c
共有 10 个文件被更改,包括 461 次插入110 次删除
  1. 121 0
      getEs/bidding.go
  2. 二进制
      getEs/getBiddingDiff
  3. 1 1
      getEs/go.mod
  4. 3 0
      getEs/main.go
  5. 144 86
      go-es/main.go
  6. 1 1
      modify_bidamount/main.go
  7. 二进制
      modify_bidamount/modify_bidamount_20250809
  8. 17 17
      project_chuan/init.go
  9. 170 3
      project_chuan/project_other.go
  10. 4 2
      project_chuan/project_test.go

+ 121 - 0
getEs/bidding.go

@@ -13,6 +13,127 @@ import (
 	"strings"
 )
 
+// getBiddingDiff 获取二个集群bidding different
+func getBiddingDiff() {
+	url1 := "http://172.17.4.184:19908"
+	url2 := "http://172.17.4.184:19905"
+	username := "jybid"
+	password := "Top2023_JEB01i@31"
+	index := "bidding"
+
+	client1, _ := elastic.NewClient(
+		elastic.SetURL(url1),
+		elastic.SetBasicAuth(username, password),
+		elastic.SetSniff(false),
+	)
+
+	client2, _ := elastic.NewClient(
+		elastic.SetURL(url2),
+		elastic.SetBasicAuth(username, password),
+		elastic.SetSniff(false),
+	)
+
+	query := elastic.NewBoolQuery().
+		Must(elastic.NewRangeQuery("id").
+			Gt("6894cd800000000000000000").
+			Lte("68961f000000000000000000"))
+
+	fmt.Println("获取集群1数据...")
+	ids1, err := fetchIDs(client1, index, query)
+	if err != nil {
+		log.Fatal(err)
+	}
+	fmt.Printf("集群1总数: %d\n", len(ids1))
+
+	fmt.Println("获取集群2数据...")
+	ids2, err := fetchIDs(client2, index, query)
+	if err != nil {
+		log.Fatal(err)
+	}
+	fmt.Printf("集群2总数: %d\n", len(ids2))
+
+	fmt.Println("差集 (集群1 - 集群2):")
+	count1 := 0
+	for id := range ids1 {
+		if _, ok := ids2[id]; !ok {
+			fmt.Println(id)
+			count1++
+		}
+	}
+	fmt.Printf("差集1总数: %d\n", count1)
+
+	fmt.Println("差集 (集群2 - 集群1):")
+	count2 := 0
+	for id := range ids2 {
+		if _, ok := ids1[id]; !ok {
+			fmt.Println(id)
+			count2++
+		}
+	}
+	fmt.Printf("差集2总数: %d\n", count2)
+}
+
+func fetchIDs(client *elastic.Client, index string, query elastic.Query) (map[string]struct{}, error) {
+	ctx := context.Background()
+	ids := make(map[string]struct{})
+
+	scrollID := ""
+	scroll := "10m"
+
+	searchSource := elastic.NewSearchSource().
+		Query(query).
+		Size(10000).
+		Sort("_doc", true)
+
+	searchService := client.Scroll(index).
+		Size(10000).
+		Scroll(scroll).
+		SearchSource(searchSource)
+
+	res, err := searchService.Do(ctx)
+	if err != nil {
+		if err == io.EOF {
+			fmt.Println("没有数据")
+			return ids, nil
+		}
+		return nil, fmt.Errorf("初始化滚动搜索失败: %w", err)
+	}
+
+	fmt.Println("命中总数:", res.TotalHits())
+
+	total := 0
+	for len(res.Hits.Hits) > 0 {
+		for _, hit := range res.Hits.Hits {
+			// 直接收集 _id
+			ids[hit.Id] = struct{}{}
+		}
+
+		total += len(res.Hits.Hits)
+		scrollID = res.ScrollId
+
+		res, err = client.Scroll().
+			ScrollId(scrollID).
+			Scroll(scroll).
+			Do(ctx)
+		log.Println("已获取数量:", total)
+
+		if err != nil {
+			if err == io.EOF {
+				break
+			}
+			return ids, fmt.Errorf("滚动搜索失败: %w", err)
+		}
+	}
+
+	_, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
+	if err != nil {
+		log.Printf("清理滚动搜索失败:%s", err)
+	}
+
+	return ids, nil
+}
+
+// getBiddingLimitData 获取限制bidding
 func getBiddingLimitData() {
 	//url := "http://172.17.4.184:19908"
 	url := "http://127.0.0.1:19908"

二进制
getEs/getBiddingDiff


+ 1 - 1
getEs/go.mod

@@ -12,6 +12,7 @@ require (
 	github.com/itcwc/go-zhipu v0.0.0-20240626065325-ffc8bf1cfaaa
 	github.com/olivere/elastic/v7 v7.0.32
 	github.com/xuri/excelize/v2 v2.9.0
+	go.mongodb.org/mongo-driver v1.11.4
 	gorm.io/driver/clickhouse v0.6.0
 	gorm.io/driver/mysql v1.5.2
 	gorm.io/gorm v1.25.5
@@ -65,7 +66,6 @@ require (
 	github.com/xuri/efp v0.0.0-20240408161823-9ad904a10d6d // indirect
 	github.com/xuri/nfp v0.0.0-20240318013403-ab9948c2c4a7 // indirect
 	github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
-	go.mongodb.org/mongo-driver v1.11.4 // indirect
 	go.opentelemetry.io/otel v1.32.0 // indirect
 	go.opentelemetry.io/otel/metric v1.32.0 // indirect
 	go.opentelemetry.io/otel/sdk v1.32.0 // indirect

+ 3 - 0
getEs/main.go

@@ -34,6 +34,9 @@ func InitMgo() {
 }
 
 func main() {
+	getBiddingDiff()
+
+	return
 	InitMgo()
 	getBidding2()
 

+ 144 - 86
go-es/main.go

@@ -1,88 +1,146 @@
 package main
 
-//func main() {
-//	// 创建Elasticsearch客户端
-//	cfg := elasticsearch.Config{
-//		Addresses: []string{"http://localhost:9200"}, // 修改为你的Elasticsearch地址
-//	}
-//	es, err := elasticsearch.NewClient(cfg)
-//	if err != nil {
-//		log.Fatalf("Error creating the Elasticsearch client: %s", err)
-//	}
-//
-//	// 创建查询请求
-//	query := map[string]interface{}{
-//		"query": map[string]interface{}{
-//			"bool": map[string]interface{}{
-//				"must_not": map[string]interface{}{
-//					"term": map[string]interface{}{
-//						"_id": map[string]interface{}{
-//							"field": "id",
-//						},
-//					},
-//				},
-//			},
-//		},
-//	}
-//
-//	// 执行查询请求
-//	res, err := es.Search(
-//		es.Search.WithContext(context.Background()),
-//		es.Search.WithIndex("qyxy_v1"),
-//		es.Search.WithBody(esapi.NewJSONReader(query)),
-//		es.Search.WithTrackTotalHits(true),
-//	)
-//	if err != nil {
-//		log.Fatalf("Error executing search: %s", err)
-//	}
-//	defer res.Body.Close()
-//
-//	if res.IsError() {
-//		log.Fatalf("Error response: %s", res.Status())
-//	}
-//
-//	// 解析查询结果
-//	var docResult map[string]interface{}
-//	if err := json.NewDecoder(res.Body).Decode(&docResult); err != nil {
-//		log.Fatalf("Error parsing the response body: %s", err)
-//	}
-//
-//	hits := docResult["hits"].(map[string]interface{})["hits"].([]interface{})
-//
-//	// 迭代更新数据
-//	for _, hit := range hits {
-//		doc := hit.(map[string]interface{})
-//		source := doc["_source"].(map[string]interface{})
-//
-//		// 更新_id为id的值
-//		source["_id"] = source["id"]
-//
-//		// 准备更新请求
-//		updateReq := map[string]interface{}{
-//			"doc": source,
-//		}
-//		body, err := json.Marshal(updateReq)
-//		if err != nil {
-//			log.Fatalf("Error marshaling update request: %s", err)
-//		}
-//
-//		req := esapi.UpdateRequest{
-//			Index:      doc["_index"].(string),
-//			DocumentID: doc["_id"].(string),
-//			Body:       esapi.NewJSONReader(body),
-//		}
-//
-//		// 执行更新请求
-//		res, err := req.Do(context.Background(), es)
-//		if err != nil {
-//			log.Fatalf("Error updating document: %s", err)
-//		}
-//		defer res.Body.Close()
-//
-//		if res.IsError() {
-//			log.Fatalf("Error response: %s", res.Status())
-//		}
-//
-//		log.Printf("Document updated: %s", res.Status())
-//	}
-//}
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"github.com/elastic/go-elasticsearch/v7"
+	"log"
+	"strings"
+	"time"
+)
+
+type EsHit struct {
+	ID string `json:"_id"`
+}
+
+type EsResponse struct {
+	ScrollID string `json:"_scroll_id"`
+	Hits     struct {
+		Hits []EsHit `json:"hits"`
+	} `json:"hits"`
+}
+
+func fetchIDs(es *elasticsearch.Client, index string, query string) (map[string]struct{}, error) {
+	ids := make(map[string]struct{})
+	ctx := context.Background()
+
+	// 初始化 Scroll 查询
+	res, err := es.Search(
+		es.Search.WithContext(ctx),
+		es.Search.WithIndex(index),
+		es.Search.WithScroll(time.Minute),
+		es.Search.WithSize(1000),
+		es.Search.WithBody(strings.NewReader(query)),
+		es.Search.WithTrackTotalHits(true),
+	)
+	if err != nil {
+		return nil, err
+	}
+	defer res.Body.Close()
+
+	if res.IsError() {
+		return nil, fmt.Errorf("init scroll error: %s", res.String())
+	}
+
+	var r EsResponse
+	if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
+		return nil, err
+	}
+
+	scrollID := r.ScrollID
+
+	for {
+		// 收集当前批次的 ID
+		for _, hit := range r.Hits.Hits {
+			ids[hit.ID] = struct{}{}
+		}
+
+		// 如果这一批没有数据,结束
+		if len(r.Hits.Hits) == 0 {
+			break
+		}
+
+		// 继续滚动
+		scrollRes, err := es.Scroll(
+			es.Scroll.WithContext(ctx),
+			es.Scroll.WithScrollID(scrollID),
+			es.Scroll.WithScroll(time.Minute),
+		)
+		if err != nil {
+			return nil, err
+		}
+		defer scrollRes.Body.Close()
+
+		if scrollRes.IsError() {
+			return nil, fmt.Errorf("scroll error: %s", scrollRes.String())
+		}
+
+		if err := json.NewDecoder(scrollRes.Body).Decode(&r); err != nil {
+			return nil, err
+		}
+
+		scrollID = r.ScrollID
+	}
+
+	// 清理 Scroll
+	es.ClearScroll(es.ClearScroll.WithScrollID(scrollID))
+
+	return ids, nil
+}
+
+func main() {
+	// 配置两个 ES 客户端(ES 7 也适用)
+	es1, _ := elasticsearch.NewClient(elasticsearch.Config{
+		Addresses: []string{"http://172.17.4.184:19908"},
+		Username:  "jybid",
+		Password:  "Top2023_JEB01i@31",
+	})
+	es2, _ := elasticsearch.NewClient(elasticsearch.Config{
+		Addresses: []string{"http://172.17.4.184:19905"},
+		Username:  "jybid",
+		Password:  "Top2023_JEB01i@31",
+	})
+
+	indexName := "bidding"
+	queryDSL := `{
+		"query": {
+			"range": {
+				"id": {
+					"gte": "689681700000000000000000",
+					"lt":  "6896aba00000000000000000"
+				}
+			}
+		},
+		"_source": false,
+		"stored_fields": []
+	}`
+
+	fmt.Println("Fetching from cluster 1...")
+	ids1, err := fetchIDs(es1, indexName, queryDSL)
+	if err != nil {
+		log.Fatalf("Cluster1 error: %v", err)
+	}
+
+	fmt.Println("Fetching from cluster 2...")
+	ids2, err := fetchIDs(es2, indexName, queryDSL)
+	if err != nil {
+		log.Fatalf("Cluster2 error: %v", err)
+	}
+
+	// 差集:集群1独有
+	fmt.Printf("Diff (cluster1 - cluster2): %d docs\n", len(ids1)-len(ids2))
+	for id := range ids1 {
+		if _, ok := ids2[id]; !ok {
+			fmt.Println(id)
+		}
+	}
+
+	// 差集:集群2独有
+	fmt.Printf("Diff (cluster2 - cluster1): %d docs\n", len(ids2)-len(ids1))
+	for id := range ids2 {
+		if _, ok := ids1[id]; !ok {
+			fmt.Println(id)
+		}
+	}
+}

+ 1 - 1
modify_bidamount/main.go

@@ -35,7 +35,7 @@ func Init() {
 	//85
 	MgoP = &mongodb.MongodbSim{
 		//MongodbAddr: "127.0.0.1:27080",
-		MongodbAddr: "172.17.4.85:27080",
+		MongodbAddr: "172.20.47.168:27080",
 		DbName:      "qfw",
 		Size:        10,
 		//Direct: true,

二进制
modify_bidamount/modify_bidamount_20250809


+ 17 - 17
project_chuan/init.go

@@ -68,22 +68,22 @@ func InitMgo() {
 	//Mgo181.InitPool()
 
 	//85 项目查询
-	MgoP = &mongodb.MongodbSim{
-		MongodbAddr: GF.MongoP.Host,
-		Size:        10,
-		DbName:      GF.MongoP.DB,
-		UserName:    GF.MongoP.Username,
-		Password:    GF.MongoP.Password,
-		Direct:      GF.MongoP.Direct,
-	}
-
-	MgoP.InitPool()
-
-	err := MgoP.C.Ping(context.Background(), nil)
-
-	if err != nil {
-		log.Info("InitMgo", zap.Any(GF.MongoP.Host, "链接失败"))
-	}
+	//MgoP = &mongodb.MongodbSim{
+	//	MongodbAddr: GF.MongoP.Host,
+	//	Size:        10,
+	//	DbName:      GF.MongoP.DB,
+	//	UserName:    GF.MongoP.Username,
+	//	Password:    GF.MongoP.Password,
+	//	Direct:      GF.MongoP.Direct,
+	//}
+	//
+	//MgoP.InitPool()
+	//
+	//err := MgoP.C.Ping(context.Background(), nil)
+	//
+	//if err != nil {
+	//	log.Info("InitMgo", zap.Any(GF.MongoP.Host, "链接失败"))
+	//}
 
 	//bidding 查询
 	MgoB = &mongodb.MongodbSim{
@@ -97,7 +97,7 @@ func InitMgo() {
 
 	MgoB.InitPool()
 
-	err = MgoB.C.Ping(context.Background(), nil)
+	err := MgoB.C.Ping(context.Background(), nil)
 
 	if err != nil {
 		log.Info("InitMgo", zap.Any(GF.MongoB.Host, "链接失败"))

+ 170 - 3
project_chuan/project_other.go

@@ -82,12 +82,85 @@ func getSearch(client *elastic.Client, projectName, areacode string, isDetail in
 
 }
 
+func getSearchNew(client *elastic.Client, projectName, areacode string, isDetail int) ([]map[string]interface{}, error) {
+	var results []map[string]interface{}
+	seenIDs := make(map[string]*elastic.SearchHit)
+	province, city := "", ""
+	if areacode != "" {
+		code := areacode[:6]
+		where := map[string]interface{}{
+			"code": code,
+		}
+
+		res, _ := MgoQY.FindOne("address_new_2020", where)
+		province = util.ObjToString((*res)["province"])
+		city = util.ObjToString((*res)["city"])
+	}
+	//fmt.Println(province, city)
+	projectName = RemoveInvisibleChars(projectName)
+	projectName = FilterGeneric(projectName)
+
+	// 1. 精准查询
+	preciseHits, err := searchPreciseOther(client, projectName, province, city, isDetail)
+	if err != nil {
+		return nil, err
+	}
+	for _, hit := range preciseHits {
+		if _, exists := seenIDs[hit.Id]; !exists {
+			seenIDs[hit.Id] = hit
+		}
+	}
+
+	// 2. 分词查询(
+	tokenHits, err := searchByTokenOtherNew(client, projectName, province, city, isDetail)
+	if err != nil {
+		return nil, err
+	}
+
+	for _, hit := range tokenHits {
+		if _, exists := seenIDs[hit.Id]; !exists {
+			seenIDs[hit.Id] = hit
+		}
+	}
+
+	for id, hit := range seenIDs {
+		var doc map[string]interface{}
+		if err = json.Unmarshal(hit.Source, &doc); err != nil {
+			continue
+		}
+		// 从 Mongo 读取 detail 字段用于后续 buyer 过滤
+		bidd, _ := MgoB.FindById("bidding", id, nil)
+		detail := util.ObjToString((*bidd)["detail"])
+		//has := true
+		//for _, v := range tokens {
+		//	if !strings.Contains(detail, v) {
+		//		has = false
+		//		break
+		//	}
+		//}
+		//if !has {
+		//	continue
+		//}
+		if detail != "" {
+			doc["detail"] = detail
+		}
+		results = append(results, doc)
+	}
+
+	sort.SliceStable(results, func(i, j int) bool {
+		return util.Int64All(results[i]["publishtime"]) > util.Int64All(results[j]["publishtime"])
+	})
+
+	return results, nil
+
+}
+
 // searchPreciseOther 精准搜索;m默认项目名称+标题;详情可选参数
 func searchPreciseOther(client *elastic.Client, projectName, area, city string, isDetail int) ([]*elastic.SearchHit, error) {
 	fieldsToTry := []string{"projectname.pname", "title"}
-	if isDetail > 0 {
-		fieldsToTry = append(fieldsToTry, "detail")
-	}
+	//if isDetail > 0 {
+	//	fieldsToTry = append(fieldsToTry, "detail")
+	//}
 
 	filtersToTry := [][]elastic.Query{
 		{elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
@@ -343,6 +416,100 @@ func searchByTokenOther(client *elastic.Client, projectName, province, city stri
 	return results, nil
 }
 
+// searchByTokenOtherNew searchByTokenOtherNew
+func searchByTokenOtherNew(client *elastic.Client, projectName, province, city string, isDetail int) ([]*elastic.SearchHit, error) {
+	var tokens []string
+	fieldsToTry := []string{"projectname.pname", "title"}
+	if isDetail > 0 {
+		fieldsToTry = append(fieldsToTry, "detail")
+	}
+	filtersToTry := [][]elastic.Query{
+		{elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
+		{elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向", "拟建")},
+	}
+
+	// 分词处理
+	analyzeResp, err := client.IndexAnalyze().
+		Index("bidding").
+		Analyzer("ik_smart").
+		Text(projectName).
+		Do(context.Background())
+	if err != nil {
+		return nil, err
+	}
+
+	for _, token := range analyzeResp.Tokens {
+		tokens = append(tokens, token.Token)
+	}
+	if len(tokens) == 0 {
+		return nil, fmt.Errorf("no tokens found from ik_smart")
+	}
+
+	// 指定返回字段
+	fetchFields := elastic.NewFetchSourceContext(true).Include(
+		"id", "title", "projectname", "projectcode", "bidamount", "area", "city",
+		"toptype", "subtype", "buyer", "budget", "buyerperson", "buyertel",
+		"s_winner", "winnertel", "agency", "publishtime")
+
+	// 抽象出内部查询逻辑,参数控制是否使用省份过滤
+	runQuery := func(withProvince bool) ([]*elastic.SearchHit, error) {
+		var allHits []*elastic.SearchHit
+		seen := make(map[string]bool)
+
+		for _, field := range fieldsToTry {
+			boolQ := elastic.NewBoolQuery()
+			for _, token := range tokens {
+				boolQ = boolQ.Must(
+					elastic.NewTermQuery(field, token), // 精确匹配分词结果
+				)
+			}
+			for _, filters := range filtersToTry {
+				query := elastic.NewBoolQuery().
+					Must(
+						boolQ,
+					).
+					Filter(filters...)
+
+				// 动态加上 area/city 条件
+				if withProvince && province != "" {
+					query = query.Must(elastic.NewTermQuery("area", province))
+				}
+				if withProvince && city != "" {
+					query = query.Must(elastic.NewTermQuery("city", city))
+				}
+
+				searchResult, err := client.Search().
+					Index("bidding").
+					Query(query).
+					FetchSourceContext(fetchFields).
+					Do(context.Background())
+				if err != nil {
+					continue
+				}
+
+				for _, hit := range searchResult.Hits.Hits {
+					if !seen[hit.Id] {
+						seen[hit.Id] = true
+						allHits = append(allHits, hit)
+					}
+				}
+			}
+		}
+		return allHits, nil
+	}
+
+	// 第一次尝试带上 province
+	results, err := runQuery(true)
+	if err != nil {
+		return nil, err
+	}
+	// 如果查不到,并且设置了省份,则再试一次去掉 province
+	if len(results) == 0 && province != "" {
+		return runQuery(false)
+	}
+	return results, nil
+}
+
 // RemoveInvisibleChars 移除控制字符和不可见字符
 func RemoveInvisibleChars(s string) string {
 	return strings.Map(func(r rune) rune {

+ 4 - 2
project_chuan/project_test.go

@@ -91,10 +91,12 @@ func TestSearchES23(t *testing.T) {
 	}
 	MgoQY.InitPool()
 
-	projectName := "东台市国家盐碱地综合利用试点项目条子泥片区"
+	//projectName := "年产100万瓶医疗食品工业气体充装项目"
+	projectName := "年产100万瓶工业医疗食品气体充装项目"
 	//projectName := "太和县乡镇政府驻地污水处理建设PPP工程监理项目"
 
-	results, err := getSearch(client, projectName, "", 0)
+	//results, err := getSearch(client, projectName, "", 0)
+	results, err := getSearchNew(client, projectName, "", 1)
 	//results, err := searchByTokenOther(client, projectName, "", "", 0)
 	if err != nil {
 		log.Info("TestSearchES23", zap.Error(err))