|
@@ -9,6 +9,7 @@ import (
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
"jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
|
|
|
|
+ "sort"
|
|
"strings"
|
|
"strings"
|
|
"sync"
|
|
"sync"
|
|
)
|
|
)
|
|
@@ -31,6 +32,7 @@ func dealProposed22Concurrent() {
|
|
|
|
|
|
coll := sess.DB("qfw").C("projectset_proposed")
|
|
coll := sess.DB("qfw").C("projectset_proposed")
|
|
query := map[string]interface{}{
|
|
query := map[string]interface{}{
|
|
|
|
+ //"area": "甘肃",
|
|
"firsttime": map[string]interface{}{
|
|
"firsttime": map[string]interface{}{
|
|
"$gte": 1735660800,
|
|
"$gte": 1735660800,
|
|
},
|
|
},
|
|
@@ -38,7 +40,7 @@ func dealProposed22Concurrent() {
|
|
iter := coll.Find(query).Select(nil).Iter()
|
|
iter := coll.Find(query).Select(nil).Iter()
|
|
|
|
|
|
// 3. 并发控制
|
|
// 3. 并发控制
|
|
- const maxWorkers = 20
|
|
|
|
|
|
+ const maxWorkers = 1
|
|
taskCh := make(chan map[string]interface{}, 2000)
|
|
taskCh := make(chan map[string]interface{}, 2000)
|
|
var wg sync.WaitGroup
|
|
var wg sync.WaitGroup
|
|
|
|
|
|
@@ -61,6 +63,11 @@ func dealProposed22Concurrent() {
|
|
if count%1000 == 0 {
|
|
if count%1000 == 0 {
|
|
log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]))
|
|
log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]))
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ if util.ObjToString(doc["area"]) != "甘肃" {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
taskCh <- cloneMap(doc) // 防止 map 重用
|
|
taskCh <- cloneMap(doc) // 防止 map 重用
|
|
}
|
|
}
|
|
close(taskCh)
|
|
close(taskCh)
|
|
@@ -77,47 +84,117 @@ func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
|
|
proposedID := mongodb.BsonIdToSId(tmp["_id"])
|
|
proposedID := mongodb.BsonIdToSId(tmp["_id"])
|
|
projectName := util.ObjToString(tmp["projectname"])
|
|
projectName := util.ObjToString(tmp["projectname"])
|
|
buyer := util.ObjToString(tmp["owner"])
|
|
buyer := util.ObjToString(tmp["owner"])
|
|
|
|
+ proposed_number := util.ObjToString(tmp["proposed_number"])
|
|
|
|
|
|
- results, err := searchES22(client, projectName, buyer, 75, 10)
|
|
|
|
|
|
+ log.Info("processOneProposed", zap.String("开始查询es", projectName))
|
|
|
|
+ results, err := searchES23(client, projectName, buyer, 18, 50)
|
|
if err != nil {
|
|
if err != nil {
|
|
log.Warn("searchES22 error", zap.Error(err))
|
|
log.Warn("searchES22 error", zap.Error(err))
|
|
return
|
|
return
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+ log.Info("processOneProposed", zap.String("结束查询es", projectName))
|
|
projectIds := []string{}
|
|
projectIds := []string{}
|
|
biddingIds := []string{}
|
|
biddingIds := []string{}
|
|
biddings := []map[string]interface{}{}
|
|
biddings := []map[string]interface{}{}
|
|
|
|
+ //bidding_id-> bidding map 数据源
|
|
|
|
+ biddingIdMap := make(map[string]map[string]interface{}, 0)
|
|
|
|
+ //project_id -> project map
|
|
|
|
+ projectIdMap := make(map[string]map[string]interface{}, 0)
|
|
|
|
+ project_bidding_ids := make(map[string][]string, 0) //存储project_id =>[bidding_id]
|
|
|
|
|
|
for _, re := range results {
|
|
for _, re := range results {
|
|
biddingID := util.ObjToString(re["id"])
|
|
biddingID := util.ObjToString(re["id"])
|
|
biddingIds = append(biddingIds, biddingID)
|
|
biddingIds = append(biddingIds, biddingID)
|
|
- biddings = append(biddings, map[string]interface{}{
|
|
|
|
|
|
+ da := map[string]interface{}{
|
|
"id": re["id"],
|
|
"id": re["id"],
|
|
"title": re["title"],
|
|
"title": re["title"],
|
|
"projectname": re["projectname"],
|
|
"projectname": re["projectname"],
|
|
"score": re["score"],
|
|
"score": re["score"],
|
|
"toptype": re["toptype"],
|
|
"toptype": re["toptype"],
|
|
"subtype": re["subtype"],
|
|
"subtype": re["subtype"],
|
|
- })
|
|
|
|
|
|
+ "buyer": re["buyer"],
|
|
|
|
+ "budget": re["budget"],
|
|
|
|
+ "buyerperson": re["buyerperson"],
|
|
|
|
+ "buyertel": re["buyertel"],
|
|
|
|
+ "s_winner": re["s_winner"],
|
|
|
|
+ "bidamount": re["bidamount"],
|
|
|
|
+ "winnertel": re["winnertel"],
|
|
|
|
+ "agency": re["agency"],
|
|
|
|
+ "publishtime": re["publishtime"],
|
|
|
|
+ }
|
|
|
|
+ biddings = append(biddings, da)
|
|
|
|
+ biddingIdMap[biddingID] = da
|
|
}
|
|
}
|
|
|
|
|
|
for _, bid := range biddingIds {
|
|
for _, bid := range biddingIds {
|
|
where2 := map[string]interface{}{"ids": bid}
|
|
where2 := map[string]interface{}{"ids": bid}
|
|
projectset, _ := MgoP.FindOne("projectset_20230904", where2)
|
|
projectset, _ := MgoP.FindOne("projectset_20230904", where2)
|
|
if projectset != nil && len(*projectset) > 0 {
|
|
if projectset != nil && len(*projectset) > 0 {
|
|
- projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
|
|
|
|
|
|
+ pid := mongodb.BsonIdToSId((*projectset)["_id"])
|
|
|
|
+ projectIds = append(projectIds, pid)
|
|
|
|
+ p_bidding_ids := project_bidding_ids[pid]
|
|
|
|
+ p_bidding_ids = append(p_bidding_ids, bid)
|
|
|
|
+ projectIdMap[pid] = *projectset
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
insert := map[string]interface{}{
|
|
insert := map[string]interface{}{
|
|
- "proposed_id": proposedID,
|
|
|
|
- "bidding_ids": removeDuplicates(biddingIds),
|
|
|
|
- "project_ids": removeDuplicates(projectIds),
|
|
|
|
- "biddings": biddings,
|
|
|
|
- "project_name": tmp["projectname"],
|
|
|
|
|
|
+ "proposed_id": proposedID,
|
|
|
|
+ "stype": 1, //代表从拟建数据-> 匹配在建数据
|
|
|
|
+ "proposed_number": proposed_number,
|
|
|
|
+ "buyer": buyer,
|
|
|
|
+ "project_name": tmp["projectname"],
|
|
|
|
+ "area": tmp["area"],
|
|
|
|
+ "city": tmp["city"],
|
|
|
|
+ "district": tmp["district"],
|
|
|
|
+ //"bidding_ids": removeDuplicates(biddingIds),
|
|
|
|
+ //"project_ids": removeDuplicates(projectIds),
|
|
|
|
+ //"biddings": biddings,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if buyer != "" {
|
|
|
|
+ where11 := map[string]interface{}{
|
|
|
|
+ "company_name": buyer,
|
|
|
|
+ }
|
|
|
|
+ std, _ := MgoQY.FindOne("qyxy_std", where11)
|
|
|
|
+ insert["credit_no"] = (*std)["credit_no"]
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ projects := make([]map[string]interface{}, 0)
|
|
|
|
+ if len(project_bidding_ids) > 0 {
|
|
|
|
+ for pid, bidding_ids := range project_bidding_ids {
|
|
|
|
+ p_project := projectIdMap[pid]
|
|
|
|
+ p_bs := make([]map[string]interface{}, 0)
|
|
|
|
+ for _, vv := range bidding_ids {
|
|
|
|
+ bi := biddingIdMap[vv]
|
|
|
|
+ p_bs = append(p_bs, bi)
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ project := map[string]interface{}{
|
|
|
|
+ "project_id": pid,
|
|
|
|
+ "projectname": p_project["projectname"],
|
|
|
|
+ "bidamount": p_project["bidamount"],
|
|
|
|
+ "area": p_project["area"],
|
|
|
|
+ "city": p_project["city"],
|
|
|
|
+ "bidstatus": p_project["bidstatus"],
|
|
|
|
+ "buyer": p_project["buyer"],
|
|
|
|
+ "firsttime": p_project["firsttime"],
|
|
|
|
+ "biddings": p_bs,
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ projects = append(projects, project)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if len(projects) > 0 {
|
|
|
|
+ insert["projects"] = projects
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if len(biddings) > 0 {
|
|
|
|
+ insert["biddings"] = biddings
|
|
}
|
|
}
|
|
|
|
|
|
- MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22", insert)
|
|
|
|
|
|
+ MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22_0523", insert)
|
|
}
|
|
}
|
|
|
|
|
|
func cloneMap(src map[string]interface{}) map[string]interface{} {
|
|
func cloneMap(src map[string]interface{}) map[string]interface{} {
|
|
@@ -288,6 +365,176 @@ func removeDuplicates(arr []string) []string {
|
|
return result
|
|
return result
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
|
|
|
|
+ fieldsToTry := []string{"projectname.pname", "title", "detail"}
|
|
|
|
+
|
|
|
|
+ filtersToTry := [][]elastic.Query{
|
|
|
|
+ {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
|
|
|
|
+ {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
|
|
|
|
+ {elastic.NewTermsQuery("toptype", "拟建")},
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ var allResults []*elastic.SearchHit
|
|
|
|
+ seenIDs := make(map[string]bool)
|
|
|
|
+
|
|
|
|
+ for _, field := range fieldsToTry {
|
|
|
|
+ for _, filter := range filtersToTry {
|
|
|
|
+ // 构建查询:使用 MultiMatchQuery + phrase
|
|
|
|
+ query := elastic.NewBoolQuery().
|
|
|
|
+ Must(elastic.NewMultiMatchQuery(projectName, field).Type("phrase")).
|
|
|
|
+ Filter(filter...)
|
|
|
|
+
|
|
|
|
+ // 执行查询
|
|
|
|
+ searchResult, err := client.Search().
|
|
|
|
+ Index("bidding").
|
|
|
|
+ Query(query).
|
|
|
|
+ Size(70).
|
|
|
|
+ Do(context.Background())
|
|
|
|
+ if err != nil {
|
|
|
|
+ return nil, err
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 去重处理
|
|
|
|
+ for _, hit := range searchResult.Hits.Hits {
|
|
|
|
+ if !seenIDs[hit.Id] {
|
|
|
|
+ seenIDs[hit.Id] = true
|
|
|
|
+ allResults = append(allResults, hit)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if len(allResults) >= maxResults {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if len(allResults) >= maxResults {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //
|
|
|
|
+ //fieldsToTry := []string{"projectname.pname", "title", "detail"}
|
|
|
|
+ //filtersToTry := [][]elastic.Query{
|
|
|
|
+ // {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
|
|
|
|
+ // {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
|
|
|
|
+ // {elastic.NewTermsQuery("toptype", "拟建")},
|
|
|
|
+ //}
|
|
|
|
+ //
|
|
|
|
+ //var allResults []*elastic.SearchHit
|
|
|
|
+ //seenIDs := make(map[string]bool)
|
|
|
|
+ //
|
|
|
|
+ //for _, field := range fieldsToTry {
|
|
|
|
+ // for _, filter := range filtersToTry {
|
|
|
|
+ // // 构建查询
|
|
|
|
+ // query := elastic.NewBoolQuery().
|
|
|
|
+ // Must(elastic.NewMatchQuery(field, projectName)).
|
|
|
|
+ // Filter(filter...)
|
|
|
|
+ //
|
|
|
|
+ // // 执行查询
|
|
|
|
+ // searchResult, err := client.Search().
|
|
|
|
+ // Index("bidding").
|
|
|
|
+ // Query(query).
|
|
|
|
+ // Size(70). // 多取一些,后面做筛选和去重
|
|
|
|
+ // Do(context.Background())
|
|
|
|
+ // if err != nil {
|
|
|
|
+ // return nil, err
|
|
|
|
+ // }
|
|
|
|
+ //
|
|
|
|
+ // for _, hit := range searchResult.Hits.Hits {
|
|
|
|
+ // if !seenIDs[hit.Id] {
|
|
|
|
+ // allResults = append(allResults, hit)
|
|
|
|
+ // seenIDs[hit.Id] = true
|
|
|
|
+ // }
|
|
|
|
+ // }
|
|
|
|
+ //
|
|
|
|
+ // if len(allResults) >= maxResults {
|
|
|
|
+ // break
|
|
|
|
+ // }
|
|
|
|
+ // }
|
|
|
|
+ // if len(allResults) >= maxResults {
|
|
|
|
+ // break
|
|
|
|
+ // }
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ var results []map[string]interface{}
|
|
|
|
+ seenProjectNames := make(map[string]bool)
|
|
|
|
+ seenProjectCodes := make(map[string]bool)
|
|
|
|
+ bidamountMap := make(map[float64]bool)
|
|
|
|
+
|
|
|
|
+ for _, hit := range allResults {
|
|
|
|
+ var doc map[string]interface{}
|
|
|
|
+ if err := json.Unmarshal(hit.Source, &doc); err != nil {
|
|
|
|
+ log.Info("解析文档失败", zap.Error(err))
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ projectNameValue := util.ObjToString(doc["projectname"])
|
|
|
|
+ if projectNameValue == "" {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ projectCode := util.ObjToString(doc["projectcode"])
|
|
|
|
+ if seenProjectCodes[projectCode] {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ seenProjectCodes[projectCode] = true
|
|
|
|
+
|
|
|
|
+ bidamount := util.Float64All(doc["bidamount"])
|
|
|
|
+ if bidamountMap[bidamount] {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ bidamountMap[bidamount] = true
|
|
|
|
+
|
|
|
|
+ // 相似度筛选
|
|
|
|
+ score := *hit.Score
|
|
|
|
+ doc["score"] = score //相似度
|
|
|
|
+ if score < scoreThreshold {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //id := util.ObjToString(doc["id"])
|
|
|
|
+ //doc["jyhref"] = GetJyURLByID(id)
|
|
|
|
+
|
|
|
|
+ //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" {
|
|
|
|
+ // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ // enrich: total_investment
|
|
|
|
+ //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil {
|
|
|
|
+ // if util.Float64All((*bidData)["total_investment"]) > 0 {
|
|
|
|
+ // doc["total_investment"] = (*bidData)["total_investment"]
|
|
|
|
+ // }
|
|
|
|
+ //}
|
|
|
|
+
|
|
|
|
+ doc["score"] = score
|
|
|
|
+ detail := util.ObjToString(doc["detail"])
|
|
|
|
+
|
|
|
|
+ // 字段中必须包含 projectName
|
|
|
|
+ if buyer2 != "" {
|
|
|
|
+ if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if seenProjectNames[projectNameValue] {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
+ seenProjectNames[projectNameValue] = true
|
|
|
|
+
|
|
|
|
+ results = append(results, doc)
|
|
|
|
+
|
|
|
|
+ if len(results) >= maxResults {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // 排序:按 score 降序
|
|
|
|
+ sort.Slice(results, func(i, j int) bool {
|
|
|
|
+ si := util.Float64All(results[i]["score"])
|
|
|
|
+ sj := util.Float64All(results[j]["score"])
|
|
|
|
+ return si > sj
|
|
|
|
+ })
|
|
|
|
+
|
|
|
|
+ return results, nil
|
|
|
|
+}
|
|
|
|
+
|
|
func searchES22(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
|
|
func searchES22(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
|
|
fieldsToTry := []string{"projectname.pname", "title", "detail"}
|
|
fieldsToTry := []string{"projectname.pname", "title", "detail"}
|
|
filtersToTry := [][]elastic.Query{
|
|
filtersToTry := [][]elastic.Query{
|
|
@@ -564,3 +811,25 @@ func GetJyURLByID(id string) string {
|
|
|
|
|
|
return url
|
|
return url
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+// GetIdByURL 解密url,获取bidding ID
|
|
|
|
+func GetIdByURL(url string) string {
|
|
|
|
+ if strings.Contains(url, "work-bench") {
|
|
|
|
+ return ""
|
|
|
|
+ }
|
|
|
|
+ if strings.Contains(url, "/article/content") {
|
|
|
|
+ urls := strings.Split(url, "content/")
|
|
|
|
+ res := strings.Split(urls[1], ".html")
|
|
|
|
+ ids := util.CommonDecodeArticle("content", res[0])
|
|
|
|
+ return ids[0]
|
|
|
|
+ }
|
|
|
|
+ if strings.HasSuffix(url, "appid") {
|
|
|
|
+ urls := strings.Split(url, "entservice/")
|
|
|
|
+ res := strings.Split(urls[1], ".html")
|
|
|
|
+ se := util.SimpleEncrypt{Key: "entservice"}
|
|
|
|
+ id := se.DecodeString(res[0])
|
|
|
|
+ return id
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return ""
|
|
|
|
+}
|