project.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "regexp"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. // dealProposed22Concurrent 多协程处理,拟建存量数据
  18. func dealProposed22Concurrent() {
  19. // 1. 初始化 ES 客户端
  20. client, err := elastic.NewClient(
  21. elastic.SetURL(GF.Es.URL),
  22. elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
  23. elastic.SetSniff(false),
  24. )
  25. if err != nil {
  26. log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
  27. }
  28. // 2. 初始化 MongoDB 连接
  29. sess := MgoP.GetMgoConn()
  30. defer MgoP.DestoryMongoConn(sess)
  31. coll := sess.DB("qfw").C("projectset_proposed")
  32. query := map[string]interface{}{
  33. "firsttime": map[string]interface{}{
  34. "$gte": 1735660800,
  35. "$lte": 1748102400,
  36. },
  37. }
  38. iter := coll.Find(query).Select(nil).Iter()
  39. // 3. 并发控制
  40. const maxWorkers = 2
  41. taskCh := make(chan map[string]interface{}, 2000)
  42. var wg sync.WaitGroup
  43. // 4. 启动 worker 处理任务
  44. for i := 0; i < maxWorkers; i++ {
  45. wg.Add(1)
  46. go func() {
  47. defer wg.Done()
  48. for doc := range taskCh {
  49. if len(doc) == 0 {
  50. log.Info("aaa", zap.Any("client", client))
  51. }
  52. processOneProposed(doc, client)
  53. }
  54. }()
  55. }
  56. // 5. 逐条读取数据并派发任务
  57. log.Info("111111", zap.String("222222", "开始处理数据"))
  58. count := 0
  59. for doc := make(map[string]interface{}); iter.Next(doc); {
  60. count++
  61. if count%1000 == 0 {
  62. log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]))
  63. }
  64. if util.ObjToString(doc["area"]) == "甘肃" {
  65. continue
  66. }
  67. taskCh <- cloneMap(doc) // 防止 map 重用
  68. }
  69. close(taskCh)
  70. wg.Wait()
  71. }
  72. // processOneProposed 处理存量数据
  73. func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
  74. defer func() {
  75. if r := recover(); r != nil {
  76. log.Warn("panic in processOneProposed", zap.Any("recover", r))
  77. }
  78. }()
  79. proposedID := mongodb.BsonIdToSId(tmp["_id"])
  80. projectName := util.ObjToString(tmp["projectname"])
  81. buyer := util.ObjToString(tmp["owner"])
  82. proposed_number := util.ObjToString(tmp["proposed_number"])
  83. //log.Info("processOneProposed", zap.String("开始查询es", projectName))
  84. results, err := searchES23(client, projectName, buyer, 20, 50)
  85. if err != nil {
  86. log.Warn("searchES22 error", zap.Error(err))
  87. return
  88. }
  89. //log.Info("processOneProposed", zap.String("结束查询es", projectName))
  90. biddings := []map[string]interface{}{}
  91. // 标讯信息
  92. for _, re := range results {
  93. biddingID := util.ObjToString(re["id"])
  94. da := map[string]interface{}{
  95. "id": re["id"],
  96. "title": re["title"],
  97. "area": re["area"],
  98. "city": re["city"],
  99. "projectname": re["projectname"],
  100. "score": re["score"],
  101. "toptype": re["toptype"],
  102. "subtype": re["subtype"],
  103. "buyer": re["buyer"],
  104. "budget": re["budget"],
  105. "buyerperson": re["buyerperson"],
  106. "buyertel": re["buyertel"],
  107. "s_winner": re["s_winner"],
  108. "bidamount": re["bidamount"],
  109. "winnertel": re["winnertel"],
  110. "agency": re["agency"],
  111. "publishtime": re["publishtime"],
  112. }
  113. //项目信息
  114. where2 := map[string]interface{}{"ids": biddingID}
  115. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  116. if projectset != nil && len((*projectset)) > 0 {
  117. v3 := map[string]interface{}{
  118. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  119. "projectname": (*projectset)["projectname"],
  120. "bidamount": (*projectset)["bidamount"],
  121. "area": (*projectset)["area"],
  122. "city": (*projectset)["city"],
  123. "firsttime": (*projectset)["firsttime"],
  124. "bidtype": (*projectset)["bidtype"],
  125. "bidstatus": (*projectset)["bidstatus"],
  126. "sortprice": (*projectset)["sortprice"],
  127. "buyer": (*projectset)["buyer"],
  128. }
  129. da["project"] = v3
  130. }
  131. biddings = append(biddings, da)
  132. }
  133. insert := map[string]interface{}{
  134. "proposed_id": proposedID,
  135. "stype": 1, //代表从拟建数据-> 匹配在建数据
  136. "proposed_number": proposed_number,
  137. "buyer": buyer,
  138. "projectname": tmp["projectname"],
  139. "area": tmp["area"],
  140. "city": tmp["city"],
  141. "district": tmp["district"],
  142. "bidding": biddings,
  143. "updatetime": time.Now().Unix(),
  144. }
  145. if isValidCodeFormat(util.ObjToString("approvecode")) {
  146. insert["approvecode"] = tmp["approvecode"]
  147. }
  148. if buyer != "" {
  149. where11 := map[string]interface{}{
  150. "company_name": buyer,
  151. }
  152. std, _ := MgoQY.FindOne("qyxy_std", where11)
  153. insert["credit_no"] = (*std)["credit_no"]
  154. }
  155. whereExist := map[string]interface{}{
  156. "proposed_id": proposedID,
  157. }
  158. exist, _ := MgoP.FindOne("wcc_dealProposed22_0524", whereExist)
  159. // 存在就更新
  160. if exist != nil && len(*exist) > 0 {
  161. exitsid := mongodb.BsonIdToSId((*exist)["_id"])
  162. MgoP.UpdateById("wcc_dealProposed22_0524", exitsid, insert)
  163. } else {
  164. insert["comeintime"] = time.Now().Unix()
  165. MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22_0524", insert)
  166. }
  167. }
  168. func cloneMap(src map[string]interface{}) map[string]interface{} {
  169. dst := make(map[string]interface{}, len(src))
  170. for k, v := range src {
  171. dst[k] = v
  172. }
  173. return dst
  174. }
  175. func dealProposed22() {
  176. url := GF.Es.URL
  177. //url := "http://127.0.0.1:19908"
  178. username := GF.Es.Username
  179. password := GF.Es.Password
  180. //index := "bidding" //索引名称
  181. // 创建 Elasticsearch 客户端
  182. client, err := elastic.NewClient(
  183. elastic.SetURL(url),
  184. elastic.SetBasicAuth(username, password),
  185. elastic.SetSniff(false),
  186. )
  187. if err != nil {
  188. log.Info("创建 Elasticsearch 客户端失败", zap.Error(err))
  189. }
  190. //
  191. sess := MgoP.GetMgoConn()
  192. defer MgoP.DestoryMongoConn(sess)
  193. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  194. where := map[string]interface{}{
  195. "firsttime": map[string]interface{}{
  196. "$gte": 1735660800,
  197. },
  198. }
  199. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  200. count := 0
  201. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  202. if count%1000 == 0 {
  203. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  204. }
  205. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  206. project_name := util.ObjToString(tmp["projectname"])
  207. buyer := util.ObjToString(tmp["owner"])
  208. results, err := searchES22(client, project_name, buyer, 60, 10)
  209. if err != nil {
  210. log.Info("searchES22", zap.Error(err))
  211. }
  212. projectIds := make([]string, 0) //拟建对应的在建项目ID
  213. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  214. biddings := make([]map[string]interface{}, 0)
  215. for _, re := range results {
  216. bidding_id := util.ObjToString(re["id"])
  217. biddingIds = append(biddingIds, bidding_id)
  218. bidding := map[string]interface{}{
  219. "id": re["id"],
  220. "title": re["title"],
  221. "projectname": re["projectname"],
  222. "score": re["score"],
  223. "toptype": re["toptype"],
  224. "subtype": re["subtype"],
  225. }
  226. biddings = append(biddings, bidding)
  227. }
  228. for _, bid := range biddingIds {
  229. where2 := map[string]interface{}{
  230. "ids": bid,
  231. }
  232. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  233. if projectset != nil && len((*projectset)) > 0 {
  234. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  235. }
  236. }
  237. insert := map[string]interface{}{
  238. "proposed_id": proposed_id,
  239. "bidding_ids": removeDuplicates(biddingIds),
  240. "project_ids": removeDuplicates(projectIds),
  241. "biddings": biddings,
  242. "project_name": tmp["projectname"],
  243. }
  244. MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22", insert)
  245. }
  246. }
  247. // dealProposed 处理拟建数据表
  248. func dealProposed() {
  249. sess := MgoP.GetMgoConn()
  250. defer MgoP.DestoryMongoConn(sess)
  251. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  252. where := map[string]interface{}{
  253. "firsttime": map[string]interface{}{
  254. "$gte": 1735660800,
  255. },
  256. }
  257. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  258. count := 0
  259. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  260. if count%1000 == 0 {
  261. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  262. }
  263. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  264. insert := make(map[string]interface{})
  265. insert["proposed_id"] = proposed_id
  266. var nzj_follw_records = make([]DwdFnzjFollowRecord, 0)
  267. err := JianyuSubjectDB.Where("proposed_id = ? ", proposed_id).Find(&nzj_follw_records).Error
  268. if err != nil {
  269. log.Info("dealProposed", zap.Error(err))
  270. }
  271. //拟建标讯,没有找到对应的在建项目数据
  272. if len(nzj_follw_records) == 0 {
  273. insert["has_bidding"] = false
  274. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  275. continue
  276. }
  277. projectIds := make([]string, 0) //拟建对应的在建项目ID
  278. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  279. for _, v := range nzj_follw_records {
  280. biddingIds = append(biddingIds, v.InfoID)
  281. }
  282. biddingIds = removeDuplicates(biddingIds)
  283. insert["bidding_ids"] = biddingIds
  284. for _, bid := range biddingIds {
  285. where2 := map[string]interface{}{
  286. "ids": bid,
  287. }
  288. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  289. if projectset != nil && len((*projectset)) > 0 {
  290. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  291. }
  292. }
  293. if len(projectIds) > 0 {
  294. insert["project_ids"] = projectIds
  295. } else {
  296. insert["has_project"] = false
  297. }
  298. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  299. }
  300. log.Info("dealProposed", zap.Any("数据处理完毕:拟建数据表", "projectset_proposed"))
  301. }
  302. // removeDuplicates 去除重复字符串
  303. func removeDuplicates(arr []string) []string {
  304. uniqueMap := make(map[string]bool)
  305. var result []string
  306. for _, str := range arr {
  307. if !uniqueMap[str] {
  308. uniqueMap[str] = true
  309. result = append(result, str)
  310. }
  311. }
  312. return result
  313. }
  314. func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  315. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  316. filtersToTry := [][]elastic.Query{
  317. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  318. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  319. {elastic.NewTermsQuery("toptype", "拟建")},
  320. }
  321. var allResults []*elastic.SearchHit
  322. seenIDs := make(map[string]bool)
  323. for _, field := range fieldsToTry {
  324. for _, filter := range filtersToTry {
  325. // 构建查询:使用 MultiMatchQuery + phrase
  326. query := elastic.NewBoolQuery().
  327. Must(elastic.NewMultiMatchQuery(projectName, field).Type("phrase")).
  328. Filter(filter...)
  329. // 执行查询
  330. searchResult, err := client.Search().
  331. Index("bidding").
  332. Query(query).
  333. Size(70).
  334. Do(context.Background())
  335. if err != nil {
  336. return nil, err
  337. }
  338. // 去重处理
  339. for _, hit := range searchResult.Hits.Hits {
  340. if !seenIDs[hit.Id] {
  341. seenIDs[hit.Id] = true
  342. allResults = append(allResults, hit)
  343. }
  344. }
  345. if len(allResults) >= maxResults {
  346. break
  347. }
  348. }
  349. if len(allResults) >= maxResults {
  350. break
  351. }
  352. }
  353. //
  354. //fieldsToTry := []string{"projectname.pname", "title", "detail"}
  355. //filtersToTry := [][]elastic.Query{
  356. // {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  357. // {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  358. // {elastic.NewTermsQuery("toptype", "拟建")},
  359. //}
  360. //
  361. //var allResults []*elastic.SearchHit
  362. //seenIDs := make(map[string]bool)
  363. //
  364. //for _, field := range fieldsToTry {
  365. // for _, filter := range filtersToTry {
  366. // // 构建查询
  367. // query := elastic.NewBoolQuery().
  368. // Must(elastic.NewMatchQuery(field, projectName)).
  369. // Filter(filter...)
  370. //
  371. // // 执行查询
  372. // searchResult, err := client.Search().
  373. // Index("bidding").
  374. // Query(query).
  375. // Size(70). // 多取一些,后面做筛选和去重
  376. // Do(context.Background())
  377. // if err != nil {
  378. // return nil, err
  379. // }
  380. //
  381. // for _, hit := range searchResult.Hits.Hits {
  382. // if !seenIDs[hit.Id] {
  383. // allResults = append(allResults, hit)
  384. // seenIDs[hit.Id] = true
  385. // }
  386. // }
  387. //
  388. // if len(allResults) >= maxResults {
  389. // break
  390. // }
  391. // }
  392. // if len(allResults) >= maxResults {
  393. // break
  394. // }
  395. //}
  396. var results []map[string]interface{}
  397. seenProjectNames := make(map[string]bool)
  398. seenProjectCodes := make(map[string]bool)
  399. bidamountMap := make(map[float64]bool)
  400. for _, hit := range allResults {
  401. var doc map[string]interface{}
  402. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  403. log.Info("解析文档失败", zap.Error(err))
  404. continue
  405. }
  406. projectNameValue := util.ObjToString(doc["projectname"])
  407. if projectNameValue == "" {
  408. continue
  409. }
  410. projectCode := util.ObjToString(doc["projectcode"])
  411. if seenProjectCodes[projectCode] {
  412. continue
  413. }
  414. seenProjectCodes[projectCode] = true
  415. bidamount := util.Float64All(doc["bidamount"])
  416. if bidamountMap[bidamount] {
  417. continue
  418. }
  419. bidamountMap[bidamount] = true
  420. // 相似度筛选
  421. score := *hit.Score
  422. doc["score"] = score //相似度
  423. if score < scoreThreshold {
  424. continue
  425. }
  426. //id := util.ObjToString(doc["id"])
  427. //doc["jyhref"] = GetJyURLByID(id)
  428. //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" {
  429. // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  430. //}
  431. // enrich: total_investment
  432. //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil {
  433. // if util.Float64All((*bidData)["total_investment"]) > 0 {
  434. // doc["total_investment"] = (*bidData)["total_investment"]
  435. // }
  436. //}
  437. doc["score"] = score
  438. detail := util.ObjToString(doc["detail"])
  439. // 字段中必须包含 projectName
  440. if buyer2 != "" {
  441. if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) {
  442. continue
  443. }
  444. }
  445. if seenProjectNames[projectNameValue] {
  446. continue
  447. }
  448. seenProjectNames[projectNameValue] = true
  449. results = append(results, doc)
  450. if len(results) >= maxResults {
  451. break
  452. }
  453. }
  454. // 排序:按 score 降序
  455. sort.Slice(results, func(i, j int) bool {
  456. si := util.Float64All(results[i]["score"])
  457. sj := util.Float64All(results[j]["score"])
  458. return si > sj
  459. })
  460. return results, nil
  461. }
  462. func searchES22(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  463. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  464. filtersToTry := [][]elastic.Query{
  465. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  466. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  467. {elastic.NewTermsQuery("toptype", "拟建")},
  468. }
  469. var allResults []*elastic.SearchHit
  470. seenIDs := make(map[string]bool)
  471. for _, field := range fieldsToTry {
  472. for _, filter := range filtersToTry {
  473. // 构建查询
  474. query := elastic.NewBoolQuery().
  475. Must(elastic.NewMatchQuery(field, projectName)).
  476. Filter(filter...)
  477. // 执行查询
  478. searchResult, err := client.Search().
  479. Index("bidding").
  480. Query(query).
  481. Size(70). // 多取一些,后面做筛选和去重
  482. Do(context.Background())
  483. if err != nil {
  484. return nil, err
  485. }
  486. for _, hit := range searchResult.Hits.Hits {
  487. if !seenIDs[hit.Id] {
  488. allResults = append(allResults, hit)
  489. seenIDs[hit.Id] = true
  490. }
  491. }
  492. if len(allResults) >= maxResults {
  493. break
  494. }
  495. }
  496. if len(allResults) >= maxResults {
  497. break
  498. }
  499. }
  500. var results []map[string]interface{}
  501. seenProjectNames := make(map[string]bool)
  502. seenProjectCodes := make(map[string]bool)
  503. bidamountMap := make(map[float64]bool)
  504. for _, hit := range allResults {
  505. var doc map[string]interface{}
  506. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  507. log.Info("解析文档失败", zap.Error(err))
  508. continue
  509. }
  510. projectNameValue := util.ObjToString(doc["projectname"])
  511. if projectNameValue == "" {
  512. continue
  513. }
  514. projectCode := util.ObjToString(doc["projectcode"])
  515. if seenProjectCodes[projectCode] {
  516. continue
  517. }
  518. seenProjectCodes[projectCode] = true
  519. bidamount := util.Float64All(doc["bidamount"])
  520. if bidamountMap[bidamount] {
  521. continue
  522. }
  523. bidamountMap[bidamount] = true
  524. // 相似度筛选
  525. score := *hit.Score
  526. doc["score"] = score //相似度
  527. if score < scoreThreshold {
  528. continue
  529. }
  530. //id := util.ObjToString(doc["id"])
  531. //doc["jyhref"] = GetJyURLByID(id)
  532. //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" {
  533. // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  534. //}
  535. // enrich: total_investment
  536. //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil {
  537. // if util.Float64All((*bidData)["total_investment"]) > 0 {
  538. // doc["total_investment"] = (*bidData)["total_investment"]
  539. // }
  540. //}
  541. doc["score"] = score
  542. detail := util.ObjToString(doc["detail"])
  543. // 字段中必须包含 projectName
  544. if buyer2 != "" {
  545. if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) {
  546. continue
  547. }
  548. }
  549. if seenProjectNames[projectNameValue] {
  550. continue
  551. }
  552. seenProjectNames[projectNameValue] = true
  553. results = append(results, doc)
  554. if len(results) >= maxResults {
  555. break
  556. }
  557. }
  558. return results, nil
  559. }
  560. func searchES(client *elastic.Client, projectName, buyer2 string) ([]map[string]interface{}, error) {
  561. query := elastic.NewBoolQuery().
  562. Must(
  563. //elastic.NewMatchQuery("projectname.pname", projectName), // 模糊匹配 projectname
  564. //elastic.NewMatchQuery("title", projectName), // 模糊匹配 projectname
  565. elastic.NewMatchQuery("detail", projectName), // 模糊匹配 projectname
  566. //elastic.NewTermQuery("area", "安徽"), // 过滤区域
  567. elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一"), // 过滤 subtype
  568. //elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向"), // 过滤 subtype
  569. //elastic.NewTermsQuery("toptype", "拟建"), // 过滤 subtype
  570. )
  571. searchResult, err := client.Search().
  572. Index("bidding").
  573. Query(query).
  574. Size(70). // 先取 12 条,确保足够数据
  575. Do(context.Background())
  576. if err != nil {
  577. return nil, err
  578. }
  579. // 结果集
  580. var results []map[string]interface{}
  581. seenProjectNames := make(map[string]bool) // 用于去重
  582. seenProjectCode := make(map[string]bool) // 用于去重
  583. bidamountMap := make(map[float64]bool)
  584. for _, hit := range searchResult.Hits.Hits {
  585. var doc map[string]interface{}
  586. err := json.Unmarshal(hit.Source, &doc)
  587. if err != nil {
  588. log.Info("解析文档失败", zap.Error(err))
  589. continue
  590. }
  591. // 获取 `projectname`,防止 key 不存在时的错误
  592. projectNameValue, ok := doc["projectname"].(string)
  593. bidamount := util.Float64All(doc["bidamount"])
  594. if !ok {
  595. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  596. continue
  597. }
  598. projectCodeValue := util.ObjToString(doc["projectcode"])
  599. if seenProjectCode[projectCodeValue] {
  600. continue
  601. }
  602. if projectCodeValue != "" {
  603. seenProjectCode[projectCodeValue] = true
  604. }
  605. // **处理额外字段**
  606. id := util.ObjToString(doc["id"])
  607. bidData, _ := MgoB.FindById("bidding", id, nil)
  608. if util.Float64All((*bidData)["total_investment"]) > 0 {
  609. doc["total_investment"] = (*bidData)["total_investment"]
  610. }
  611. doc["jyhref"] = GetJyURLByID(id)
  612. score := *hit.Score
  613. site := util.ObjToString(doc["site"])
  614. if site == "中华人民共和国自然资源部" {
  615. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  616. }
  617. doc["score"] = score //相似度
  618. detail := util.ObjToString(doc["detail"])
  619. if !strings.Contains(detail, projectName) {
  620. continue
  621. }
  622. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  623. if seenProjectNames[projectNameValue] {
  624. continue
  625. }
  626. if bidamountMap[bidamount] {
  627. continue
  628. }
  629. // **记录该 `projectname`,避免重复**
  630. seenProjectNames[projectNameValue] = true
  631. bidamountMap[bidamount] = true
  632. // **加入结果集**
  633. results = append(results, doc)
  634. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  635. if len(results) >= 10 {
  636. break
  637. }
  638. }
  639. //2、判断正文包含采购单位
  640. for _, hit := range searchResult.Hits.Hits {
  641. var doc map[string]interface{}
  642. err := json.Unmarshal(hit.Source, &doc)
  643. if err != nil {
  644. log.Info("解析文档失败:", zap.Error(err))
  645. continue
  646. }
  647. // 获取 `projectname`,防止 key 不存在时的错误
  648. projectNameValue, ok := doc["projectname"].(string)
  649. bidamount := util.Float64All(doc["bidamount"])
  650. if !ok {
  651. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  652. continue
  653. }
  654. // **处理额外字段**
  655. id := util.ObjToString(doc["id"])
  656. doc["jyhref"] = GetJyURLByID(id)
  657. score := *hit.Score
  658. doc["score"] = score //相似度
  659. site := util.ObjToString(doc["site"])
  660. if site == "中华人民共和国自然资源部" {
  661. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  662. }
  663. //判断正文包含采购单位
  664. detail := util.ObjToString(doc["detail"])
  665. if !strings.Contains(detail, buyer2) {
  666. continue
  667. }
  668. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  669. if seenProjectNames[projectNameValue] {
  670. continue
  671. }
  672. if bidamountMap[bidamount] {
  673. continue
  674. }
  675. // **记录该 `projectname`,避免重复**
  676. seenProjectNames[projectNameValue] = true
  677. bidamountMap[bidamount] = true
  678. // **加入结果集**
  679. results = append(results, doc)
  680. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  681. if len(results) >= 10 {
  682. break
  683. }
  684. }
  685. return results, nil
  686. }
  687. // GetJyURLByID 获取剑鱼地址
  688. func GetJyURLByID(id string) string {
  689. var Url = "https://www.jianyu360.com/article/content/%s.html"
  690. url := fmt.Sprintf(Url, util.CommonEncodeArticle("content", id))
  691. return url
  692. }
  693. // GetIdByURL 解密url,获取bidding ID
  694. func GetIdByURL(url string) string {
  695. if strings.Contains(url, "work-bench") {
  696. return ""
  697. }
  698. if strings.Contains(url, "/article/content") {
  699. urls := strings.Split(url, "content/")
  700. res := strings.Split(urls[1], ".html")
  701. ids := util.CommonDecodeArticle("content", res[0])
  702. return ids[0]
  703. }
  704. if strings.HasSuffix(url, "appid") {
  705. urls := strings.Split(url, "entservice/")
  706. res := strings.Split(urls[1], ".html")
  707. se := util.SimpleEncrypt{Key: "entservice"}
  708. id := se.DecodeString(res[0])
  709. return id
  710. }
  711. return ""
  712. }
  713. // isValidCodeFormat 判断 拟建项目编码
  714. func isValidCodeFormat(s string) bool {
  715. pattern := `^\d{4}-\d{6}-\d{2}-\d{2}-\d{6}$`
  716. matched, err := regexp.MatchString(pattern, s)
  717. if err != nil {
  718. return false
  719. }
  720. return matched
  721. }