Bladeren bron

新增es遍历数据处理功能

mxs 1 jaar geleden
bovenliggende
commit
251f171f26
2 gewijzigde bestanden met toevoegingen van 151 en 1 verwijderingen
  1. 1 1
      data_project_wy/go.mod
  2. 150 0
      data_project_wy/main.go

+ 1 - 1
data_project_wy/go.mod

@@ -5,6 +5,7 @@ go 1.21.5
 require (
 	github.com/ClickHouse/clickhouse-go/v2 v2.23.0
 	github.com/gogf/gf/v2 v2.7.0
+	github.com/olivere/elastic/v7 v7.0.32
 	github.com/robfig/cron v1.2.0
 	go.mongodb.org/mongo-driver v1.11.4
 	jygit.jydev.jianyu360.cn/data_processing/common_utils v0.0.0-20240412074219-927f3f682cb3
@@ -24,7 +25,6 @@ require (
 	github.com/klauspost/compress v1.17.7 // indirect
 	github.com/mailru/easyjson v0.7.7 // indirect
 	github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
-	github.com/olivere/elastic/v7 v7.0.32 // indirect
 	github.com/paulmach/orb v0.11.1 // indirect
 	github.com/pierrec/lz4/v4 v4.1.21 // indirect
 	github.com/pkg/errors v0.9.1 // indirect

+ 150 - 0
data_project_wy/main.go

@@ -1,8 +1,14 @@
 package main
 
 import (
+	"context"
+	"encoding/json"
 	"fmt"
+	"github.com/gogf/gf/v2/util/gconv"
+	"github.com/olivere/elastic/v7"
 	"github.com/robfig/cron"
+	"io"
+	"log"
 	"sync"
 )
 
@@ -108,3 +114,147 @@ func tmp() {
 	//}
 	fmt.Println("迁移结束...", len(repeat))
 }
+
+func getBiddingData() {
+	url := "http://172.17.4.184:19908"
+	//url := "http://192.168.3.149:9200"
+	username := "jybid"
+	password := "Top2023_JEB01i@31"
+	index := "transaction_info" //索引名称
+	// 创建 Elasticsearch 客户端
+	client, err := elastic.NewClient(
+		elastic.SetURL(url),
+		elastic.SetBasicAuth(username, password),
+		elastic.SetSniff(false),
+	)
+	if err != nil {
+		log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
+	}
+
+	//rangeQuery := elastic.NewRangeQuery("project_bidstatus").Lt(3)
+	//query := elastic.NewBoolQuery().
+	//	Must(elastic.NewTermQuery("id", "652d2f432ccb08936fe2ed60")) //
+	//Must(rangeQuery)
+	//Must(elastic.NewTermQuery("subtype", "招标"))
+
+	//query := elastic.NewBoolQuery().
+	//        //北京,天津,河北,上海,江苏,浙江,安徽
+	//        //Must(elastic.NewTermQuery("area", "北京市")).
+	//        Must(elastic.NewTermsQuery("subtype", "中标", "单一", "成交", "合同")).
+	//        Must(elastic.NewTermsQuery("area", "北京", "上海", "江苏", "浙江", "广东")).
+	//        Must(rangeQuery)
+
+	ctx := context.Background()
+	//开始滚动搜索
+	scrollID := ""
+	scroll := "10m"
+	searchSource := elastic.NewSearchSource().
+		//Query(query).
+		Size(500)
+	//Sort("_doc", true) //升序排序
+	//Sort("_doc", false) //降序排序
+
+	searchService := client.Scroll(index).
+		Size(500).
+		Scroll(scroll).
+		SearchSource(searchSource)
+
+	res, err := searchService.Do(ctx)
+
+	if err != nil {
+		if err == io.EOF {
+			fmt.Println("没有数据")
+		} else {
+			panic(err)
+		}
+
+	}
+	//defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
+	fmt.Println("总数是:", res.TotalHits())
+
+	total := 0
+	for len(res.Hits.Hits) > 0 {
+		for _, hit := range res.Hits.Hits {
+			var doc map[string]interface{}
+			err := json.Unmarshal(hit.Source, &doc)
+			if err != nil {
+				log.Printf("解析文档失败:%s", err)
+				continue
+			}
+			id := gconv.String(doc["id"])
+			//情报
+			info_ids := gconv.Strings(doc["info_ids"])
+			info := FindInfomationData(info_ids...)
+			doc["information_id"] = info.Id
+			doc["starttime"] = info.Starttime
+			doc["endtime"] = info.Endtime
+			set := map[string]interface{}{
+				"information_id": info.Id,
+				"starttime":      info.Starttime,
+				"endtime":        info.Endtime,
+			}
+
+			//法人
+			buyer, agency := "", ""
+			winners := []string{}
+			if gconv.String(doc["buyer_id"]) == "" {
+				buyer = gconv.String(doc["buyer"])
+			}
+			if gconv.String(doc["agency_id"]) == "" {
+				agency = gconv.String(doc["agency"])
+			}
+			if len(gconv.Strings(doc["winner_id"])) == 0 {
+				if winnersTmp := gconv.Strings(doc["winner"]); len(winnersTmp) > 0 {
+					winners = winnersTmp
+				}
+			}
+			buyer_id, agency_id, winner_ids := FindEntInfoData(id, buyer, agency, winners)
+			if buyer_id != "" {
+				doc["buyer_id"] = buyer_id
+				set["buyer_id"] = buyer_id
+			}
+			if agency_id != "" {
+				doc["agency_id"] = agency_id
+				set["agency_id"] = agency_id
+			}
+			if len(winner_ids) > 0 {
+				doc["winner_id"] = winner_ids
+				set["winner_id"] = winner_ids
+			}
+			//
+			coll := "projectset_wy_back"
+			business_type := gconv.String(doc["business_type"])
+			if business_type == "新增项目" {
+				coll = "projectset_wy"
+				doc["business_type"] = "新增物业项目"
+				set["business_type"] = "新增物业项目"
+			} else if business_type == "采购意向" {
+				coll = "projectset_wy"
+			}
+			//更新es
+			client.Update().Index(index).Id(id).Doc(doc).Do(context.Background())
+			//更新clickhouse
+			UpdateDataToClickHouse(set, map[string]interface{}{"project_id": id})
+			//更新mgo
+			MgoPro.Update(coll, map[string]interface{}{"project_id": id}, map[string]interface{}{"$set": set}, false, false)
+		}
+		total = total + len(res.Hits.Hits)
+		scrollID = res.ScrollId
+		res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
+		log.Println("current count:", total)
+		if err != nil {
+			if err == io.EOF {
+				// 滚动到最后一批数据,退出循环
+				break
+			}
+			log.Println("滚动搜索失败:", err, res)
+			break // 处理错误时退出循环
+		}
+	}
+	// 在循环外调用 ClearScroll
+	_, err = client.ClearScroll().ScrollId(scrollID).Do(ctx)
+	if err != nil {
+		log.Printf("清理滚动搜索失败:%s", err)
+	}
+	fmt.Println("结束~~~~~~~~~~~~~~~")
+}