package main import ( "context" "encoding/json" "fmt" "github.com/olivere/elastic/v7" "go.uber.org/zap" util "jygit.jydev.jianyu360.cn/data_processing/common_utils" "jygit.jydev.jianyu360.cn/data_processing/common_utils/log" "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb" "sort" "strings" "sync" ) // dealProposed22Concurrent 多协程处理 func dealProposed22Concurrent() { // 1. 初始化 ES 客户端 client, err := elastic.NewClient( elastic.SetURL(GF.Es.URL), elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password), elastic.SetSniff(false), ) if err != nil { log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err)) } // 2. 初始化 MongoDB 连接 sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) coll := sess.DB("qfw").C("projectset_proposed") query := map[string]interface{}{ //"area": "甘肃", "firsttime": map[string]interface{}{ "$gte": 1735660800, }, } iter := coll.Find(query).Select(nil).Iter() // 3. 并发控制 const maxWorkers = 1 taskCh := make(chan map[string]interface{}, 2000) var wg sync.WaitGroup // 4. 启动 worker 处理任务 for i := 0; i < maxWorkers; i++ { wg.Add(1) go func() { defer wg.Done() for doc := range taskCh { processOneProposed(doc, client) } }() } // 5. 逐条读取数据并派发任务 log.Info("111111", zap.String("222222", "开始处理数据")) count := 0 for doc := make(map[string]interface{}); iter.Next(doc); { count++ if count%1000 == 0 { log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"])) } if util.ObjToString(doc["area"]) != "甘肃" { continue } taskCh <- cloneMap(doc) // 防止 map 重用 } close(taskCh) wg.Wait() } func processOneProposed(tmp map[string]interface{}, client *elastic.Client) { defer func() { if r := recover(); r != nil { log.Warn("panic in processOneProposed", zap.Any("recover", r)) } }() proposedID := mongodb.BsonIdToSId(tmp["_id"]) projectName := util.ObjToString(tmp["projectname"]) buyer := util.ObjToString(tmp["owner"]) proposed_number := util.ObjToString(tmp["proposed_number"]) log.Info("processOneProposed", zap.String("开始查询es", projectName)) results, err := searchES23(client, projectName, buyer, 18, 50) if err != nil { log.Warn("searchES22 error", zap.Error(err)) return } log.Info("processOneProposed", zap.String("结束查询es", projectName)) projectIds := []string{} biddingIds := []string{} biddings := []map[string]interface{}{} //bidding_id-> bidding map 数据源 biddingIdMap := make(map[string]map[string]interface{}, 0) //project_id -> project map projectIdMap := make(map[string]map[string]interface{}, 0) project_bidding_ids := make(map[string][]string, 0) //存储project_id =>[bidding_id] for _, re := range results { biddingID := util.ObjToString(re["id"]) biddingIds = append(biddingIds, biddingID) da := map[string]interface{}{ "id": re["id"], "title": re["title"], "projectname": re["projectname"], "score": re["score"], "toptype": re["toptype"], "subtype": re["subtype"], "buyer": re["buyer"], "budget": re["budget"], "buyerperson": re["buyerperson"], "buyertel": re["buyertel"], "s_winner": re["s_winner"], "bidamount": re["bidamount"], "winnertel": re["winnertel"], "agency": re["agency"], "publishtime": re["publishtime"], } biddings = append(biddings, da) biddingIdMap[biddingID] = da } for _, bid := range biddingIds { where2 := map[string]interface{}{"ids": bid} projectset, _ := MgoP.FindOne("projectset_20230904", where2) if projectset != nil && len(*projectset) > 0 { pid := mongodb.BsonIdToSId((*projectset)["_id"]) projectIds = append(projectIds, pid) p_bidding_ids := project_bidding_ids[pid] p_bidding_ids = append(p_bidding_ids, bid) projectIdMap[pid] = *projectset } } insert := map[string]interface{}{ "proposed_id": proposedID, "stype": 1, //代表从拟建数据-> 匹配在建数据 "proposed_number": proposed_number, "buyer": buyer, "project_name": tmp["projectname"], "area": tmp["area"], "city": tmp["city"], "district": tmp["district"], //"bidding_ids": removeDuplicates(biddingIds), //"project_ids": removeDuplicates(projectIds), //"biddings": biddings, } if buyer != "" { where11 := map[string]interface{}{ "company_name": buyer, } std, _ := MgoQY.FindOne("qyxy_std", where11) insert["credit_no"] = (*std)["credit_no"] } projects := make([]map[string]interface{}, 0) if len(project_bidding_ids) > 0 { for pid, bidding_ids := range project_bidding_ids { p_project := projectIdMap[pid] p_bs := make([]map[string]interface{}, 0) for _, vv := range bidding_ids { bi := biddingIdMap[vv] p_bs = append(p_bs, bi) } project := map[string]interface{}{ "project_id": pid, "projectname": p_project["projectname"], "bidamount": p_project["bidamount"], "area": p_project["area"], "city": p_project["city"], "bidstatus": p_project["bidstatus"], "buyer": p_project["buyer"], "firsttime": p_project["firsttime"], "biddings": p_bs, } projects = append(projects, project) } } if len(projects) > 0 { insert["projects"] = projects } if len(biddings) > 0 { insert["biddings"] = biddings } MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22_0523", insert) } func cloneMap(src map[string]interface{}) map[string]interface{} { dst := make(map[string]interface{}, len(src)) for k, v := range src { dst[k] = v } return dst } func dealProposed22() { url := GF.Es.URL //url := "http://127.0.0.1:19908" username := GF.Es.Username password := GF.Es.Password //index := "bidding" //索引名称 // 创建 Elasticsearch 客户端 client, err := elastic.NewClient( elastic.SetURL(url), elastic.SetBasicAuth(username, password), elastic.SetSniff(false), ) if err != nil { log.Info("创建 Elasticsearch 客户端失败", zap.Error(err)) } // sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed")) where := map[string]interface{}{ "firsttime": map[string]interface{}{ "$gte": 1735660800, }, } queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ { if count%1000 == 0 { log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"])) } proposed_id := mongodb.BsonIdToSId(tmp["_id"]) project_name := util.ObjToString(tmp["projectname"]) buyer := util.ObjToString(tmp["owner"]) results, err := searchES22(client, project_name, buyer, 60, 10) if err != nil { log.Info("searchES22", zap.Error(err)) } projectIds := make([]string, 0) //拟建对应的在建项目ID biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids biddings := make([]map[string]interface{}, 0) for _, re := range results { bidding_id := util.ObjToString(re["id"]) biddingIds = append(biddingIds, bidding_id) bidding := map[string]interface{}{ "id": re["id"], "title": re["title"], "projectname": re["projectname"], "score": re["score"], "toptype": re["toptype"], "subtype": re["subtype"], } biddings = append(biddings, bidding) } for _, bid := range biddingIds { where2 := map[string]interface{}{ "ids": bid, } projectset, _ := MgoP.FindOne("projectset_20230904", where2) if projectset != nil && len((*projectset)) > 0 { projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"])) } } insert := map[string]interface{}{ "proposed_id": proposed_id, "bidding_ids": removeDuplicates(biddingIds), "project_ids": removeDuplicates(projectIds), "biddings": biddings, "project_name": tmp["projectname"], } MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22", insert) } } // dealProposed 处理拟建数据表 func dealProposed() { sess := MgoP.GetMgoConn() defer MgoP.DestoryMongoConn(sess) log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed")) where := map[string]interface{}{ "firsttime": map[string]interface{}{ "$gte": 1735660800, }, } queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter() count := 0 for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ { if count%1000 == 0 { log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"])) } proposed_id := mongodb.BsonIdToSId(tmp["_id"]) insert := make(map[string]interface{}) insert["proposed_id"] = proposed_id var nzj_follw_records = make([]DwdFnzjFollowRecord, 0) err := JianyuSubjectDB.Where("proposed_id = ? ", proposed_id).Find(&nzj_follw_records).Error if err != nil { log.Info("dealProposed", zap.Error(err)) } //拟建标讯,没有找到对应的在建项目数据 if len(nzj_follw_records) == 0 { insert["has_bidding"] = false MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert) continue } projectIds := make([]string, 0) //拟建对应的在建项目ID biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids for _, v := range nzj_follw_records { biddingIds = append(biddingIds, v.InfoID) } biddingIds = removeDuplicates(biddingIds) insert["bidding_ids"] = biddingIds for _, bid := range biddingIds { where2 := map[string]interface{}{ "ids": bid, } projectset, _ := MgoP.FindOne("projectset_20230904", where2) if projectset != nil && len((*projectset)) > 0 { projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"])) } } if len(projectIds) > 0 { insert["project_ids"] = projectIds } else { insert["has_project"] = false } MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert) } log.Info("dealProposed", zap.Any("数据处理完毕:拟建数据表", "projectset_proposed")) } // removeDuplicates 去除重复字符串 func removeDuplicates(arr []string) []string { uniqueMap := make(map[string]bool) var result []string for _, str := range arr { if !uniqueMap[str] { uniqueMap[str] = true result = append(result, str) } } return result } func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) { fieldsToTry := []string{"projectname.pname", "title", "detail"} filtersToTry := [][]elastic.Query{ {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")}, {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")}, {elastic.NewTermsQuery("toptype", "拟建")}, } var allResults []*elastic.SearchHit seenIDs := make(map[string]bool) for _, field := range fieldsToTry { for _, filter := range filtersToTry { // 构建查询:使用 MultiMatchQuery + phrase query := elastic.NewBoolQuery(). Must(elastic.NewMultiMatchQuery(projectName, field).Type("phrase")). Filter(filter...) // 执行查询 searchResult, err := client.Search(). Index("bidding"). Query(query). Size(70). Do(context.Background()) if err != nil { return nil, err } // 去重处理 for _, hit := range searchResult.Hits.Hits { if !seenIDs[hit.Id] { seenIDs[hit.Id] = true allResults = append(allResults, hit) } } if len(allResults) >= maxResults { break } } if len(allResults) >= maxResults { break } } // //fieldsToTry := []string{"projectname.pname", "title", "detail"} //filtersToTry := [][]elastic.Query{ // {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")}, // {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")}, // {elastic.NewTermsQuery("toptype", "拟建")}, //} // //var allResults []*elastic.SearchHit //seenIDs := make(map[string]bool) // //for _, field := range fieldsToTry { // for _, filter := range filtersToTry { // // 构建查询 // query := elastic.NewBoolQuery(). // Must(elastic.NewMatchQuery(field, projectName)). // Filter(filter...) // // // 执行查询 // searchResult, err := client.Search(). // Index("bidding"). // Query(query). // Size(70). // 多取一些,后面做筛选和去重 // Do(context.Background()) // if err != nil { // return nil, err // } // // for _, hit := range searchResult.Hits.Hits { // if !seenIDs[hit.Id] { // allResults = append(allResults, hit) // seenIDs[hit.Id] = true // } // } // // if len(allResults) >= maxResults { // break // } // } // if len(allResults) >= maxResults { // break // } //} var results []map[string]interface{} seenProjectNames := make(map[string]bool) seenProjectCodes := make(map[string]bool) bidamountMap := make(map[float64]bool) for _, hit := range allResults { var doc map[string]interface{} if err := json.Unmarshal(hit.Source, &doc); err != nil { log.Info("解析文档失败", zap.Error(err)) continue } projectNameValue := util.ObjToString(doc["projectname"]) if projectNameValue == "" { continue } projectCode := util.ObjToString(doc["projectcode"]) if seenProjectCodes[projectCode] { continue } seenProjectCodes[projectCode] = true bidamount := util.Float64All(doc["bidamount"]) if bidamountMap[bidamount] { continue } bidamountMap[bidamount] = true // 相似度筛选 score := *hit.Score doc["score"] = score //相似度 if score < scoreThreshold { continue } //id := util.ObjToString(doc["id"]) //doc["jyhref"] = GetJyURLByID(id) //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" { // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"]) //} // enrich: total_investment //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil { // if util.Float64All((*bidData)["total_investment"]) > 0 { // doc["total_investment"] = (*bidData)["total_investment"] // } //} doc["score"] = score detail := util.ObjToString(doc["detail"]) // 字段中必须包含 projectName if buyer2 != "" { if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) { continue } } if seenProjectNames[projectNameValue] { continue } seenProjectNames[projectNameValue] = true results = append(results, doc) if len(results) >= maxResults { break } } // 排序:按 score 降序 sort.Slice(results, func(i, j int) bool { si := util.Float64All(results[i]["score"]) sj := util.Float64All(results[j]["score"]) return si > sj }) return results, nil } func searchES22(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) { fieldsToTry := []string{"projectname.pname", "title", "detail"} filtersToTry := [][]elastic.Query{ {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")}, {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")}, {elastic.NewTermsQuery("toptype", "拟建")}, } var allResults []*elastic.SearchHit seenIDs := make(map[string]bool) for _, field := range fieldsToTry { for _, filter := range filtersToTry { // 构建查询 query := elastic.NewBoolQuery(). Must(elastic.NewMatchQuery(field, projectName)). Filter(filter...) // 执行查询 searchResult, err := client.Search(). Index("bidding"). Query(query). Size(70). // 多取一些,后面做筛选和去重 Do(context.Background()) if err != nil { return nil, err } for _, hit := range searchResult.Hits.Hits { if !seenIDs[hit.Id] { allResults = append(allResults, hit) seenIDs[hit.Id] = true } } if len(allResults) >= maxResults { break } } if len(allResults) >= maxResults { break } } var results []map[string]interface{} seenProjectNames := make(map[string]bool) seenProjectCodes := make(map[string]bool) bidamountMap := make(map[float64]bool) for _, hit := range allResults { var doc map[string]interface{} if err := json.Unmarshal(hit.Source, &doc); err != nil { log.Info("解析文档失败", zap.Error(err)) continue } projectNameValue := util.ObjToString(doc["projectname"]) if projectNameValue == "" { continue } projectCode := util.ObjToString(doc["projectcode"]) if seenProjectCodes[projectCode] { continue } seenProjectCodes[projectCode] = true bidamount := util.Float64All(doc["bidamount"]) if bidamountMap[bidamount] { continue } bidamountMap[bidamount] = true // 相似度筛选 score := *hit.Score doc["score"] = score //相似度 if score < scoreThreshold { continue } //id := util.ObjToString(doc["id"]) //doc["jyhref"] = GetJyURLByID(id) //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" { // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"]) //} // enrich: total_investment //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil { // if util.Float64All((*bidData)["total_investment"]) > 0 { // doc["total_investment"] = (*bidData)["total_investment"] // } //} doc["score"] = score detail := util.ObjToString(doc["detail"]) // 字段中必须包含 projectName if buyer2 != "" { if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) { continue } } if seenProjectNames[projectNameValue] { continue } seenProjectNames[projectNameValue] = true results = append(results, doc) if len(results) >= maxResults { break } } return results, nil } func searchES(client *elastic.Client, projectName, buyer2 string) ([]map[string]interface{}, error) { query := elastic.NewBoolQuery(). Must( //elastic.NewMatchQuery("projectname.pname", projectName), // 模糊匹配 projectname //elastic.NewMatchQuery("title", projectName), // 模糊匹配 projectname elastic.NewMatchQuery("detail", projectName), // 模糊匹配 projectname //elastic.NewTermQuery("area", "安徽"), // 过滤区域 elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一"), // 过滤 subtype //elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向"), // 过滤 subtype //elastic.NewTermsQuery("toptype", "拟建"), // 过滤 subtype ) searchResult, err := client.Search(). Index("bidding"). Query(query). Size(70). // 先取 12 条,确保足够数据 Do(context.Background()) if err != nil { return nil, err } // 结果集 var results []map[string]interface{} seenProjectNames := make(map[string]bool) // 用于去重 seenProjectCode := make(map[string]bool) // 用于去重 bidamountMap := make(map[float64]bool) for _, hit := range searchResult.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Info("解析文档失败", zap.Error(err)) continue } // 获取 `projectname`,防止 key 不存在时的错误 projectNameValue, ok := doc["projectname"].(string) bidamount := util.Float64All(doc["bidamount"]) if !ok { log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"])) continue } projectCodeValue := util.ObjToString(doc["projectcode"]) if seenProjectCode[projectCodeValue] { continue } if projectCodeValue != "" { seenProjectCode[projectCodeValue] = true } // **处理额外字段** id := util.ObjToString(doc["id"]) bidData, _ := MgoB.FindById("bidding", id, nil) if util.Float64All((*bidData)["total_investment"]) > 0 { doc["total_investment"] = (*bidData)["total_investment"] } doc["jyhref"] = GetJyURLByID(id) score := *hit.Score site := util.ObjToString(doc["site"]) if site == "中华人民共和国自然资源部" { doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"]) } doc["score"] = score //相似度 detail := util.ObjToString(doc["detail"]) if !strings.Contains(detail, projectName) { continue } // **去重逻辑**:如果 `projectname` 已经出现过,则跳过 if seenProjectNames[projectNameValue] { continue } if bidamountMap[bidamount] { continue } // **记录该 `projectname`,避免重复** seenProjectNames[projectNameValue] = true bidamountMap[bidamount] = true // **加入结果集** results = append(results, doc) // **如果已经找到 6 条不同 `projectname`,就跳出循环** if len(results) >= 10 { break } } //2、判断正文包含采购单位 for _, hit := range searchResult.Hits.Hits { var doc map[string]interface{} err := json.Unmarshal(hit.Source, &doc) if err != nil { log.Info("解析文档失败:", zap.Error(err)) continue } // 获取 `projectname`,防止 key 不存在时的错误 projectNameValue, ok := doc["projectname"].(string) bidamount := util.Float64All(doc["bidamount"]) if !ok { log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"])) continue } // **处理额外字段** id := util.ObjToString(doc["id"]) doc["jyhref"] = GetJyURLByID(id) score := *hit.Score doc["score"] = score //相似度 site := util.ObjToString(doc["site"]) if site == "中华人民共和国自然资源部" { doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"]) } //判断正文包含采购单位 detail := util.ObjToString(doc["detail"]) if !strings.Contains(detail, buyer2) { continue } // **去重逻辑**:如果 `projectname` 已经出现过,则跳过 if seenProjectNames[projectNameValue] { continue } if bidamountMap[bidamount] { continue } // **记录该 `projectname`,避免重复** seenProjectNames[projectNameValue] = true bidamountMap[bidamount] = true // **加入结果集** results = append(results, doc) // **如果已经找到 6 条不同 `projectname`,就跳出循环** if len(results) >= 10 { break } } return results, nil } // GetJyURLByID 获取剑鱼地址 func GetJyURLByID(id string) string { var Url = "https://www.jianyu360.com/article/content/%s.html" url := fmt.Sprintf(Url, util.CommonEncodeArticle("content", id)) return url } // GetIdByURL 解密url,获取bidding ID func GetIdByURL(url string) string { if strings.Contains(url, "work-bench") { return "" } if strings.Contains(url, "/article/content") { urls := strings.Split(url, "content/") res := strings.Split(urls[1], ".html") ids := util.CommonDecodeArticle("content", res[0]) return ids[0] } if strings.HasSuffix(url, "appid") { urls := strings.Split(url, "entservice/") res := strings.Split(urls[1], ".html") se := util.SimpleEncrypt{Key: "entservice"} id := se.DecodeString(res[0]) return id } return "" }