|
@@ -16,6 +16,166 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+func dealXlsxTest() {
|
|
|
+ // 1. 初始化 ES 客户端
|
|
|
+ client, err := elastic.NewClient(
|
|
|
+ elastic.SetURL(GF.Es.URL),
|
|
|
+ elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
|
|
|
+ elastic.SetSniff(false),
|
|
|
+ )
|
|
|
+ if err != nil {
|
|
|
+ log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. 初始化 MongoDB 连接
|
|
|
+ sess := MgoP.GetMgoConn()
|
|
|
+ defer MgoP.DestoryMongoConn(sess)
|
|
|
+
|
|
|
+ coll := sess.DB("qfw").C("wcc_dealXlsxData_0524")
|
|
|
+ iter := coll.Find(nil).Select(nil).Iter()
|
|
|
+
|
|
|
+ // 3. 并发控制
|
|
|
+ const maxWorkers = 2
|
|
|
+ taskCh := make(chan map[string]interface{}, 2000)
|
|
|
+ var wg sync.WaitGroup
|
|
|
+
|
|
|
+ // 4. 启动 worker 处理任务
|
|
|
+ for i := 0; i < maxWorkers; i++ {
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ for doc := range taskCh {
|
|
|
+ if len(doc) == 0 {
|
|
|
+ log.Info("aaa", zap.Any("client", client))
|
|
|
+ }
|
|
|
+ processOneProposedTest(doc, client)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ }
|
|
|
+
|
|
|
+ // 5. 逐条读取数据并派发任务
|
|
|
+ log.Info("111111", zap.String("222222", "开始处理数据"))
|
|
|
+ count := 0
|
|
|
+ for doc := make(map[string]interface{}); iter.Next(doc); {
|
|
|
+ count++
|
|
|
+ if count%1000 == 0 {
|
|
|
+ log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["p1_project_name"]))
|
|
|
+ }
|
|
|
+
|
|
|
+ taskCh <- cloneMap(doc) // 防止 map 重用
|
|
|
+ }
|
|
|
+ close(taskCh)
|
|
|
+ wg.Wait()
|
|
|
+}
|
|
|
+
|
|
|
+func processOneProposedTest(tmp map[string]interface{}, client *elastic.Client) {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ log.Warn("panic in processOneProposed", zap.Any("recover", r))
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ id := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ //proposedID := mongodb.BsonIdToSId(tmp["_id"])
|
|
|
+ projectName := util.ObjToString(tmp["p1_project_name"])
|
|
|
+ buyer := util.ObjToString(tmp["p1_project_owner"])
|
|
|
+ //proposed_number := util.ObjToString(tmp["proposed_number"])
|
|
|
+
|
|
|
+ //log.Info("processOneProposed", zap.String("开始查询es", projectName))
|
|
|
+ results, err := searchES23(client, projectName, buyer, 20, 50)
|
|
|
+ if err != nil {
|
|
|
+ log.Warn("searchES22 error", zap.Error(err))
|
|
|
+ return
|
|
|
+ }
|
|
|
+ //log.Info("processOneProposed", zap.String("结束查询es", projectName))
|
|
|
+ biddings := []map[string]interface{}{}
|
|
|
+
|
|
|
+ update := map[string]interface{}{}
|
|
|
+ // 标讯信息
|
|
|
+ for _, re := range results {
|
|
|
+ biddingID := util.ObjToString(re["id"])
|
|
|
+ da := map[string]interface{}{
|
|
|
+ "id": re["id"],
|
|
|
+ "title": re["title"],
|
|
|
+ "area": re["area"],
|
|
|
+ "city": re["city"],
|
|
|
+ "projectname": re["projectname"],
|
|
|
+ "score": re["score"],
|
|
|
+ "toptype": re["toptype"],
|
|
|
+ "subtype": re["subtype"],
|
|
|
+ "buyer": re["buyer"],
|
|
|
+ "budget": re["budget"],
|
|
|
+ "buyerperson": re["buyerperson"],
|
|
|
+ "buyertel": re["buyertel"],
|
|
|
+ "s_winner": re["s_winner"],
|
|
|
+ "bidamount": re["bidamount"],
|
|
|
+ "winnertel": re["winnertel"],
|
|
|
+ "agency": re["agency"],
|
|
|
+ "publishtime": re["publishtime"],
|
|
|
+ }
|
|
|
+ //项目信息
|
|
|
+ where2 := map[string]interface{}{"ids": biddingID}
|
|
|
+ if util.ObjToString(re["toptype"]) == "拟建" {
|
|
|
+ projectset, _ := MgoP.FindOne("projectset_proposed", where2)
|
|
|
+ if projectset != nil && len((*projectset)) > 0 {
|
|
|
+ v3 := map[string]interface{}{
|
|
|
+ "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
|
|
|
+ "projectname": (*projectset)["projectname"],
|
|
|
+ "bidamount": (*projectset)["bidamount"],
|
|
|
+ "area": (*projectset)["area"],
|
|
|
+ "city": (*projectset)["city"],
|
|
|
+ "district": (*projectset)["district"],
|
|
|
+ "owner": (*projectset)["owner"],
|
|
|
+ "approvecode": (*projectset)["approvecode"],
|
|
|
+ }
|
|
|
+ if (*projectset)["owner"] != "" {
|
|
|
+ where11 := map[string]interface{}{
|
|
|
+ "company_name": (*projectset)["owner"],
|
|
|
+ }
|
|
|
+ std, _ := MgoQY.FindOne("qyxy_std", where11)
|
|
|
+ v3["credit_no"] = (*std)["credit_no"]
|
|
|
+ }
|
|
|
+ da["project"] = v3
|
|
|
+ }
|
|
|
+ biddings = append(biddings, da)
|
|
|
+ } else {
|
|
|
+ projectset, _ := MgoP.FindOne("projectset_20230904", where2)
|
|
|
+ if projectset != nil && len((*projectset)) > 0 {
|
|
|
+ v3 := map[string]interface{}{
|
|
|
+ "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
|
|
|
+ "projectname": (*projectset)["projectname"],
|
|
|
+ "bidamount": (*projectset)["bidamount"],
|
|
|
+ "area": (*projectset)["area"],
|
|
|
+ "city": (*projectset)["city"],
|
|
|
+ "district": (*projectset)["district"],
|
|
|
+ "firsttime": (*projectset)["firsttime"],
|
|
|
+ "bidtype": (*projectset)["bidtype"],
|
|
|
+ "bidstatus": (*projectset)["bidstatus"],
|
|
|
+ "sortprice": (*projectset)["sortprice"],
|
|
|
+ "buyer": (*projectset)["buyer"],
|
|
|
+ }
|
|
|
+ if (*projectset)["buyer"] != "" {
|
|
|
+ where11 := map[string]interface{}{
|
|
|
+ "company_name": (*projectset)["buyer"],
|
|
|
+ }
|
|
|
+ std, _ := MgoQY.FindOne("qyxy_std", where11)
|
|
|
+ v3["credit_no"] = (*std)["credit_no"]
|
|
|
+ }
|
|
|
+ da["project"] = v3
|
|
|
+ }
|
|
|
+
|
|
|
+ biddings = append(biddings, da)
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(biddings) > 0 {
|
|
|
+ update["bidding"] = biddings
|
|
|
+ MgoP.UpdateById("wcc_dealXlsxData_0524", id, map[string]interface{}{"$set": update})
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
// dealProposed22Concurrent 多协程处理,拟建存量数据
|
|
|
func dealProposed22Concurrent() {
|
|
|
// 1. 初始化 ES 客户端
|
|
@@ -38,11 +198,12 @@ func dealProposed22Concurrent() {
|
|
|
"$gte": 1735660800,
|
|
|
"$lte": 1748102400,
|
|
|
},
|
|
|
+ "area": "甘肃",
|
|
|
}
|
|
|
iter := coll.Find(query).Select(nil).Iter()
|
|
|
|
|
|
// 3. 并发控制
|
|
|
- const maxWorkers = 2
|
|
|
+ const maxWorkers = 1
|
|
|
taskCh := make(chan map[string]interface{}, 2000)
|
|
|
var wg sync.WaitGroup
|
|
|
|
|
@@ -69,9 +230,9 @@ func dealProposed22Concurrent() {
|
|
|
log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]))
|
|
|
}
|
|
|
|
|
|
- if util.ObjToString(doc["area"]) == "甘肃" {
|
|
|
- continue
|
|
|
- }
|
|
|
+ //if util.ObjToString(doc["area"]) == "甘肃" {
|
|
|
+ // continue
|
|
|
+ //}
|
|
|
|
|
|
taskCh <- cloneMap(doc) // 防止 map 重用
|
|
|
}
|
|
@@ -125,23 +286,51 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
|
|
|
}
|
|
|
//项目信息
|
|
|
where2 := map[string]interface{}{"ids": biddingID}
|
|
|
- projectset, _ := MgoP.FindOne("projectset_20230904", where2)
|
|
|
- if projectset != nil && len((*projectset)) > 0 {
|
|
|
- v3 := map[string]interface{}{
|
|
|
- "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
|
|
|
- "projectname": (*projectset)["projectname"],
|
|
|
- "bidamount": (*projectset)["bidamount"],
|
|
|
- "area": (*projectset)["area"],
|
|
|
- "city": (*projectset)["city"],
|
|
|
- "firsttime": (*projectset)["firsttime"],
|
|
|
- "bidtype": (*projectset)["bidtype"],
|
|
|
- "bidstatus": (*projectset)["bidstatus"],
|
|
|
- "sortprice": (*projectset)["sortprice"],
|
|
|
- "buyer": (*projectset)["buyer"],
|
|
|
+ if util.ObjToString(re["toptype"]) == "拟建" {
|
|
|
+ projectset, _ := MgoP.FindOne("projectset_proposed", where2)
|
|
|
+ if projectset != nil && len((*projectset)) > 0 {
|
|
|
+ v3 := map[string]interface{}{
|
|
|
+ "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
|
|
|
+ "projectname": (*projectset)["projectname"],
|
|
|
+ "bidamount": (*projectset)["bidamount"],
|
|
|
+ "area": (*projectset)["area"],
|
|
|
+ "city": (*projectset)["city"],
|
|
|
+ "district": (*projectset)["district"],
|
|
|
+ "owner": (*projectset)["owner"],
|
|
|
+ "approvecode": (*projectset)["approvecode"],
|
|
|
+ "approvestatus": (*projectset)["approvestatus"],
|
|
|
+ "sourceinfourl": (*projectset)["sourceinfourl"],
|
|
|
+ }
|
|
|
+ if (*projectset)["owner"] != "" {
|
|
|
+ where11 := map[string]interface{}{
|
|
|
+ "company_name": (*projectset)["owner"],
|
|
|
+ }
|
|
|
+ std, _ := MgoQY.FindOne("qyxy_std", where11)
|
|
|
+ v3["credit_no"] = (*std)["credit_no"]
|
|
|
+ }
|
|
|
+ da["project"] = v3
|
|
|
}
|
|
|
- da["project"] = v3
|
|
|
+ biddings = append(biddings, da)
|
|
|
+ } else {
|
|
|
+ projectset, _ := MgoP.FindOne("projectset_20230904", where2)
|
|
|
+ if projectset != nil && len((*projectset)) > 0 {
|
|
|
+ v3 := map[string]interface{}{
|
|
|
+ "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
|
|
|
+ "projectname": (*projectset)["projectname"],
|
|
|
+ "bidamount": (*projectset)["bidamount"],
|
|
|
+ "area": (*projectset)["area"],
|
|
|
+ "city": (*projectset)["city"],
|
|
|
+ "firsttime": (*projectset)["firsttime"],
|
|
|
+ "bidtype": (*projectset)["bidtype"],
|
|
|
+ "bidstatus": (*projectset)["bidstatus"],
|
|
|
+ "sortprice": (*projectset)["sortprice"],
|
|
|
+ "buyer": (*projectset)["buyer"],
|
|
|
+ }
|
|
|
+ da["project"] = v3
|
|
|
+ }
|
|
|
+ biddings = append(biddings, da)
|
|
|
}
|
|
|
- biddings = append(biddings, da)
|
|
|
+
|
|
|
}
|
|
|
|
|
|
insert := map[string]interface{}{
|
|
@@ -157,7 +346,7 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
|
|
|
"updatetime": time.Now().Unix(),
|
|
|
}
|
|
|
|
|
|
- if isValidCodeFormat(util.ObjToString("approvecode")) {
|
|
|
+ if isValidCodeFormat(util.ObjToString(tmp["approvecode"])) {
|
|
|
insert["approvecode"] = tmp["approvecode"]
|
|
|
}
|
|
|
|
|
@@ -172,14 +361,17 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
|
|
|
whereExist := map[string]interface{}{
|
|
|
"proposed_id": proposedID,
|
|
|
}
|
|
|
- exist, _ := MgoP.FindOne("wcc_dealProposed22_0524", whereExist)
|
|
|
+ if GF.Env.Savecoll == "" {
|
|
|
+ GF.Env.Savecoll = "wcc_nj_zj_bidding"
|
|
|
+ }
|
|
|
+ exist, _ := MgoP.FindOne(GF.Env.Savecoll, whereExist)
|
|
|
// 存在就更新
|
|
|
if exist != nil && len(*exist) > 0 {
|
|
|
exitsid := mongodb.BsonIdToSId((*exist)["_id"])
|
|
|
- MgoP.UpdateById("wcc_dealProposed22_0524", exitsid, insert)
|
|
|
+ MgoP.UpdateById(GF.Env.Savecoll, exitsid, map[string]interface{}{"$set": insert})
|
|
|
} else {
|
|
|
insert["comeintime"] = time.Now().Unix()
|
|
|
- MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22_0524", insert)
|
|
|
+ MgoP.InsertOrUpdate("qfw", GF.Env.Savecoll, insert)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -396,54 +588,11 @@ func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThresho
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- //
|
|
|
- //fieldsToTry := []string{"projectname.pname", "title", "detail"}
|
|
|
- //filtersToTry := [][]elastic.Query{
|
|
|
- // {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
|
|
|
- // {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
|
|
|
- // {elastic.NewTermsQuery("toptype", "拟建")},
|
|
|
- //}
|
|
|
- //
|
|
|
- //var allResults []*elastic.SearchHit
|
|
|
- //seenIDs := make(map[string]bool)
|
|
|
- //
|
|
|
- //for _, field := range fieldsToTry {
|
|
|
- // for _, filter := range filtersToTry {
|
|
|
- // // 构建查询
|
|
|
- // query := elastic.NewBoolQuery().
|
|
|
- // Must(elastic.NewMatchQuery(field, projectName)).
|
|
|
- // Filter(filter...)
|
|
|
- //
|
|
|
- // // 执行查询
|
|
|
- // searchResult, err := client.Search().
|
|
|
- // Index("bidding").
|
|
|
- // Query(query).
|
|
|
- // Size(70). // 多取一些,后面做筛选和去重
|
|
|
- // Do(context.Background())
|
|
|
- // if err != nil {
|
|
|
- // return nil, err
|
|
|
- // }
|
|
|
- //
|
|
|
- // for _, hit := range searchResult.Hits.Hits {
|
|
|
- // if !seenIDs[hit.Id] {
|
|
|
- // allResults = append(allResults, hit)
|
|
|
- // seenIDs[hit.Id] = true
|
|
|
- // }
|
|
|
- // }
|
|
|
- //
|
|
|
- // if len(allResults) >= maxResults {
|
|
|
- // break
|
|
|
- // }
|
|
|
- // }
|
|
|
- // if len(allResults) >= maxResults {
|
|
|
- // break
|
|
|
- // }
|
|
|
- //}
|
|
|
-
|
|
|
var results []map[string]interface{}
|
|
|
seenProjectNames := make(map[string]bool)
|
|
|
seenProjectCodes := make(map[string]bool)
|
|
|
bidamountMap := make(map[float64]bool)
|
|
|
+ //subtypeMap := make(map[string]bool)
|
|
|
|
|
|
for _, hit := range allResults {
|
|
|
var doc map[string]interface{}
|
|
@@ -458,16 +607,20 @@ func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThresho
|
|
|
}
|
|
|
|
|
|
projectCode := util.ObjToString(doc["projectcode"])
|
|
|
- if seenProjectCodes[projectCode] {
|
|
|
- continue
|
|
|
+ if projectCode != "" {
|
|
|
+ if seenProjectCodes[projectCode] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ seenProjectCodes[projectCode] = true
|
|
|
}
|
|
|
- seenProjectCodes[projectCode] = true
|
|
|
|
|
|
bidamount := util.Float64All(doc["bidamount"])
|
|
|
- if bidamountMap[bidamount] {
|
|
|
- continue
|
|
|
+ if bidamount != 0 {
|
|
|
+ if bidamountMap[bidamount] {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ bidamountMap[bidamount] = true
|
|
|
}
|
|
|
- bidamountMap[bidamount] = true
|
|
|
|
|
|
// 相似度筛选
|
|
|
score := *hit.Score
|
|
@@ -476,20 +629,6 @@ func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThresho
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- //id := util.ObjToString(doc["id"])
|
|
|
- //doc["jyhref"] = GetJyURLByID(id)
|
|
|
-
|
|
|
- //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" {
|
|
|
- // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
|
|
|
- //}
|
|
|
-
|
|
|
- // enrich: total_investment
|
|
|
- //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil {
|
|
|
- // if util.Float64All((*bidData)["total_investment"]) > 0 {
|
|
|
- // doc["total_investment"] = (*bidData)["total_investment"]
|
|
|
- // }
|
|
|
- //}
|
|
|
-
|
|
|
doc["score"] = score
|
|
|
detail := util.ObjToString(doc["detail"])
|
|
|
|