project.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "sort"
  12. "strings"
  13. "sync"
  14. )
  15. // dealProposed22Concurrent 多协程处理
  16. func dealProposed22Concurrent() {
  17. // 1. 初始化 ES 客户端
  18. client, err := elastic.NewClient(
  19. elastic.SetURL(GF.Es.URL),
  20. elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
  21. elastic.SetSniff(false),
  22. )
  23. if err != nil {
  24. log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
  25. }
  26. // 2. 初始化 MongoDB 连接
  27. sess := MgoP.GetMgoConn()
  28. defer MgoP.DestoryMongoConn(sess)
  29. coll := sess.DB("qfw").C("projectset_proposed")
  30. query := map[string]interface{}{
  31. //"area": "甘肃",
  32. "firsttime": map[string]interface{}{
  33. "$gte": 1735660800,
  34. },
  35. }
  36. iter := coll.Find(query).Select(nil).Iter()
  37. // 3. 并发控制
  38. const maxWorkers = 1
  39. taskCh := make(chan map[string]interface{}, 2000)
  40. var wg sync.WaitGroup
  41. // 4. 启动 worker 处理任务
  42. for i := 0; i < maxWorkers; i++ {
  43. wg.Add(1)
  44. go func() {
  45. defer wg.Done()
  46. for doc := range taskCh {
  47. processOneProposed(doc, client)
  48. }
  49. }()
  50. }
  51. // 5. 逐条读取数据并派发任务
  52. log.Info("111111", zap.String("222222", "开始处理数据"))
  53. count := 0
  54. for doc := make(map[string]interface{}); iter.Next(doc); {
  55. count++
  56. if count%1000 == 0 {
  57. log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]))
  58. }
  59. if util.ObjToString(doc["area"]) != "甘肃" {
  60. continue
  61. }
  62. taskCh <- cloneMap(doc) // 防止 map 重用
  63. }
  64. close(taskCh)
  65. wg.Wait()
  66. }
  67. func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
  68. defer func() {
  69. if r := recover(); r != nil {
  70. log.Warn("panic in processOneProposed", zap.Any("recover", r))
  71. }
  72. }()
  73. proposedID := mongodb.BsonIdToSId(tmp["_id"])
  74. projectName := util.ObjToString(tmp["projectname"])
  75. buyer := util.ObjToString(tmp["owner"])
  76. proposed_number := util.ObjToString(tmp["proposed_number"])
  77. log.Info("processOneProposed", zap.String("开始查询es", projectName))
  78. results, err := searchES23(client, projectName, buyer, 18, 50)
  79. if err != nil {
  80. log.Warn("searchES22 error", zap.Error(err))
  81. return
  82. }
  83. log.Info("processOneProposed", zap.String("结束查询es", projectName))
  84. projectIds := []string{}
  85. biddingIds := []string{}
  86. biddings := []map[string]interface{}{}
  87. //bidding_id-> bidding map 数据源
  88. biddingIdMap := make(map[string]map[string]interface{}, 0)
  89. //project_id -> project map
  90. projectIdMap := make(map[string]map[string]interface{}, 0)
  91. project_bidding_ids := make(map[string][]string, 0) //存储project_id =>[bidding_id]
  92. for _, re := range results {
  93. biddingID := util.ObjToString(re["id"])
  94. biddingIds = append(biddingIds, biddingID)
  95. da := map[string]interface{}{
  96. "id": re["id"],
  97. "title": re["title"],
  98. "projectname": re["projectname"],
  99. "score": re["score"],
  100. "toptype": re["toptype"],
  101. "subtype": re["subtype"],
  102. "buyer": re["buyer"],
  103. "budget": re["budget"],
  104. "buyerperson": re["buyerperson"],
  105. "buyertel": re["buyertel"],
  106. "s_winner": re["s_winner"],
  107. "bidamount": re["bidamount"],
  108. "winnertel": re["winnertel"],
  109. "agency": re["agency"],
  110. "publishtime": re["publishtime"],
  111. }
  112. biddings = append(biddings, da)
  113. biddingIdMap[biddingID] = da
  114. }
  115. for _, bid := range biddingIds {
  116. where2 := map[string]interface{}{"ids": bid}
  117. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  118. if projectset != nil && len(*projectset) > 0 {
  119. pid := mongodb.BsonIdToSId((*projectset)["_id"])
  120. projectIds = append(projectIds, pid)
  121. p_bidding_ids := project_bidding_ids[pid]
  122. p_bidding_ids = append(p_bidding_ids, bid)
  123. projectIdMap[pid] = *projectset
  124. }
  125. }
  126. insert := map[string]interface{}{
  127. "proposed_id": proposedID,
  128. "stype": 1, //代表从拟建数据-> 匹配在建数据
  129. "proposed_number": proposed_number,
  130. "buyer": buyer,
  131. "project_name": tmp["projectname"],
  132. "area": tmp["area"],
  133. "city": tmp["city"],
  134. "district": tmp["district"],
  135. //"bidding_ids": removeDuplicates(biddingIds),
  136. //"project_ids": removeDuplicates(projectIds),
  137. //"biddings": biddings,
  138. }
  139. if buyer != "" {
  140. where11 := map[string]interface{}{
  141. "company_name": buyer,
  142. }
  143. std, _ := MgoQY.FindOne("qyxy_std", where11)
  144. insert["credit_no"] = (*std)["credit_no"]
  145. }
  146. projects := make([]map[string]interface{}, 0)
  147. if len(project_bidding_ids) > 0 {
  148. for pid, bidding_ids := range project_bidding_ids {
  149. p_project := projectIdMap[pid]
  150. p_bs := make([]map[string]interface{}, 0)
  151. for _, vv := range bidding_ids {
  152. bi := biddingIdMap[vv]
  153. p_bs = append(p_bs, bi)
  154. }
  155. project := map[string]interface{}{
  156. "project_id": pid,
  157. "projectname": p_project["projectname"],
  158. "bidamount": p_project["bidamount"],
  159. "area": p_project["area"],
  160. "city": p_project["city"],
  161. "bidstatus": p_project["bidstatus"],
  162. "buyer": p_project["buyer"],
  163. "firsttime": p_project["firsttime"],
  164. "biddings": p_bs,
  165. }
  166. projects = append(projects, project)
  167. }
  168. }
  169. if len(projects) > 0 {
  170. insert["projects"] = projects
  171. }
  172. if len(biddings) > 0 {
  173. insert["biddings"] = biddings
  174. }
  175. MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22_0523", insert)
  176. }
  177. func cloneMap(src map[string]interface{}) map[string]interface{} {
  178. dst := make(map[string]interface{}, len(src))
  179. for k, v := range src {
  180. dst[k] = v
  181. }
  182. return dst
  183. }
  184. func dealProposed22() {
  185. url := GF.Es.URL
  186. //url := "http://127.0.0.1:19908"
  187. username := GF.Es.Username
  188. password := GF.Es.Password
  189. //index := "bidding" //索引名称
  190. // 创建 Elasticsearch 客户端
  191. client, err := elastic.NewClient(
  192. elastic.SetURL(url),
  193. elastic.SetBasicAuth(username, password),
  194. elastic.SetSniff(false),
  195. )
  196. if err != nil {
  197. log.Info("创建 Elasticsearch 客户端失败", zap.Error(err))
  198. }
  199. //
  200. sess := MgoP.GetMgoConn()
  201. defer MgoP.DestoryMongoConn(sess)
  202. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  203. where := map[string]interface{}{
  204. "firsttime": map[string]interface{}{
  205. "$gte": 1735660800,
  206. },
  207. }
  208. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  209. count := 0
  210. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  211. if count%1000 == 0 {
  212. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  213. }
  214. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  215. project_name := util.ObjToString(tmp["projectname"])
  216. buyer := util.ObjToString(tmp["owner"])
  217. results, err := searchES22(client, project_name, buyer, 60, 10)
  218. if err != nil {
  219. log.Info("searchES22", zap.Error(err))
  220. }
  221. projectIds := make([]string, 0) //拟建对应的在建项目ID
  222. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  223. biddings := make([]map[string]interface{}, 0)
  224. for _, re := range results {
  225. bidding_id := util.ObjToString(re["id"])
  226. biddingIds = append(biddingIds, bidding_id)
  227. bidding := map[string]interface{}{
  228. "id": re["id"],
  229. "title": re["title"],
  230. "projectname": re["projectname"],
  231. "score": re["score"],
  232. "toptype": re["toptype"],
  233. "subtype": re["subtype"],
  234. }
  235. biddings = append(biddings, bidding)
  236. }
  237. for _, bid := range biddingIds {
  238. where2 := map[string]interface{}{
  239. "ids": bid,
  240. }
  241. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  242. if projectset != nil && len((*projectset)) > 0 {
  243. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  244. }
  245. }
  246. insert := map[string]interface{}{
  247. "proposed_id": proposed_id,
  248. "bidding_ids": removeDuplicates(biddingIds),
  249. "project_ids": removeDuplicates(projectIds),
  250. "biddings": biddings,
  251. "project_name": tmp["projectname"],
  252. }
  253. MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22", insert)
  254. }
  255. }
  256. // dealProposed 处理拟建数据表
  257. func dealProposed() {
  258. sess := MgoP.GetMgoConn()
  259. defer MgoP.DestoryMongoConn(sess)
  260. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  261. where := map[string]interface{}{
  262. "firsttime": map[string]interface{}{
  263. "$gte": 1735660800,
  264. },
  265. }
  266. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  267. count := 0
  268. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  269. if count%1000 == 0 {
  270. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  271. }
  272. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  273. insert := make(map[string]interface{})
  274. insert["proposed_id"] = proposed_id
  275. var nzj_follw_records = make([]DwdFnzjFollowRecord, 0)
  276. err := JianyuSubjectDB.Where("proposed_id = ? ", proposed_id).Find(&nzj_follw_records).Error
  277. if err != nil {
  278. log.Info("dealProposed", zap.Error(err))
  279. }
  280. //拟建标讯,没有找到对应的在建项目数据
  281. if len(nzj_follw_records) == 0 {
  282. insert["has_bidding"] = false
  283. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  284. continue
  285. }
  286. projectIds := make([]string, 0) //拟建对应的在建项目ID
  287. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  288. for _, v := range nzj_follw_records {
  289. biddingIds = append(biddingIds, v.InfoID)
  290. }
  291. biddingIds = removeDuplicates(biddingIds)
  292. insert["bidding_ids"] = biddingIds
  293. for _, bid := range biddingIds {
  294. where2 := map[string]interface{}{
  295. "ids": bid,
  296. }
  297. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  298. if projectset != nil && len((*projectset)) > 0 {
  299. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  300. }
  301. }
  302. if len(projectIds) > 0 {
  303. insert["project_ids"] = projectIds
  304. } else {
  305. insert["has_project"] = false
  306. }
  307. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  308. }
  309. log.Info("dealProposed", zap.Any("数据处理完毕:拟建数据表", "projectset_proposed"))
  310. }
  311. // removeDuplicates 去除重复字符串
  312. func removeDuplicates(arr []string) []string {
  313. uniqueMap := make(map[string]bool)
  314. var result []string
  315. for _, str := range arr {
  316. if !uniqueMap[str] {
  317. uniqueMap[str] = true
  318. result = append(result, str)
  319. }
  320. }
  321. return result
  322. }
  323. func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  324. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  325. filtersToTry := [][]elastic.Query{
  326. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  327. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  328. {elastic.NewTermsQuery("toptype", "拟建")},
  329. }
  330. var allResults []*elastic.SearchHit
  331. seenIDs := make(map[string]bool)
  332. for _, field := range fieldsToTry {
  333. for _, filter := range filtersToTry {
  334. // 构建查询:使用 MultiMatchQuery + phrase
  335. query := elastic.NewBoolQuery().
  336. Must(elastic.NewMultiMatchQuery(projectName, field).Type("phrase")).
  337. Filter(filter...)
  338. // 执行查询
  339. searchResult, err := client.Search().
  340. Index("bidding").
  341. Query(query).
  342. Size(70).
  343. Do(context.Background())
  344. if err != nil {
  345. return nil, err
  346. }
  347. // 去重处理
  348. for _, hit := range searchResult.Hits.Hits {
  349. if !seenIDs[hit.Id] {
  350. seenIDs[hit.Id] = true
  351. allResults = append(allResults, hit)
  352. }
  353. }
  354. if len(allResults) >= maxResults {
  355. break
  356. }
  357. }
  358. if len(allResults) >= maxResults {
  359. break
  360. }
  361. }
  362. //
  363. //fieldsToTry := []string{"projectname.pname", "title", "detail"}
  364. //filtersToTry := [][]elastic.Query{
  365. // {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  366. // {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  367. // {elastic.NewTermsQuery("toptype", "拟建")},
  368. //}
  369. //
  370. //var allResults []*elastic.SearchHit
  371. //seenIDs := make(map[string]bool)
  372. //
  373. //for _, field := range fieldsToTry {
  374. // for _, filter := range filtersToTry {
  375. // // 构建查询
  376. // query := elastic.NewBoolQuery().
  377. // Must(elastic.NewMatchQuery(field, projectName)).
  378. // Filter(filter...)
  379. //
  380. // // 执行查询
  381. // searchResult, err := client.Search().
  382. // Index("bidding").
  383. // Query(query).
  384. // Size(70). // 多取一些,后面做筛选和去重
  385. // Do(context.Background())
  386. // if err != nil {
  387. // return nil, err
  388. // }
  389. //
  390. // for _, hit := range searchResult.Hits.Hits {
  391. // if !seenIDs[hit.Id] {
  392. // allResults = append(allResults, hit)
  393. // seenIDs[hit.Id] = true
  394. // }
  395. // }
  396. //
  397. // if len(allResults) >= maxResults {
  398. // break
  399. // }
  400. // }
  401. // if len(allResults) >= maxResults {
  402. // break
  403. // }
  404. //}
  405. var results []map[string]interface{}
  406. seenProjectNames := make(map[string]bool)
  407. seenProjectCodes := make(map[string]bool)
  408. bidamountMap := make(map[float64]bool)
  409. for _, hit := range allResults {
  410. var doc map[string]interface{}
  411. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  412. log.Info("解析文档失败", zap.Error(err))
  413. continue
  414. }
  415. projectNameValue := util.ObjToString(doc["projectname"])
  416. if projectNameValue == "" {
  417. continue
  418. }
  419. projectCode := util.ObjToString(doc["projectcode"])
  420. if seenProjectCodes[projectCode] {
  421. continue
  422. }
  423. seenProjectCodes[projectCode] = true
  424. bidamount := util.Float64All(doc["bidamount"])
  425. if bidamountMap[bidamount] {
  426. continue
  427. }
  428. bidamountMap[bidamount] = true
  429. // 相似度筛选
  430. score := *hit.Score
  431. doc["score"] = score //相似度
  432. if score < scoreThreshold {
  433. continue
  434. }
  435. //id := util.ObjToString(doc["id"])
  436. //doc["jyhref"] = GetJyURLByID(id)
  437. //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" {
  438. // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  439. //}
  440. // enrich: total_investment
  441. //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil {
  442. // if util.Float64All((*bidData)["total_investment"]) > 0 {
  443. // doc["total_investment"] = (*bidData)["total_investment"]
  444. // }
  445. //}
  446. doc["score"] = score
  447. detail := util.ObjToString(doc["detail"])
  448. // 字段中必须包含 projectName
  449. if buyer2 != "" {
  450. if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) {
  451. continue
  452. }
  453. }
  454. if seenProjectNames[projectNameValue] {
  455. continue
  456. }
  457. seenProjectNames[projectNameValue] = true
  458. results = append(results, doc)
  459. if len(results) >= maxResults {
  460. break
  461. }
  462. }
  463. // 排序:按 score 降序
  464. sort.Slice(results, func(i, j int) bool {
  465. si := util.Float64All(results[i]["score"])
  466. sj := util.Float64All(results[j]["score"])
  467. return si > sj
  468. })
  469. return results, nil
  470. }
  471. func searchES22(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  472. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  473. filtersToTry := [][]elastic.Query{
  474. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  475. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  476. {elastic.NewTermsQuery("toptype", "拟建")},
  477. }
  478. var allResults []*elastic.SearchHit
  479. seenIDs := make(map[string]bool)
  480. for _, field := range fieldsToTry {
  481. for _, filter := range filtersToTry {
  482. // 构建查询
  483. query := elastic.NewBoolQuery().
  484. Must(elastic.NewMatchQuery(field, projectName)).
  485. Filter(filter...)
  486. // 执行查询
  487. searchResult, err := client.Search().
  488. Index("bidding").
  489. Query(query).
  490. Size(70). // 多取一些,后面做筛选和去重
  491. Do(context.Background())
  492. if err != nil {
  493. return nil, err
  494. }
  495. for _, hit := range searchResult.Hits.Hits {
  496. if !seenIDs[hit.Id] {
  497. allResults = append(allResults, hit)
  498. seenIDs[hit.Id] = true
  499. }
  500. }
  501. if len(allResults) >= maxResults {
  502. break
  503. }
  504. }
  505. if len(allResults) >= maxResults {
  506. break
  507. }
  508. }
  509. var results []map[string]interface{}
  510. seenProjectNames := make(map[string]bool)
  511. seenProjectCodes := make(map[string]bool)
  512. bidamountMap := make(map[float64]bool)
  513. for _, hit := range allResults {
  514. var doc map[string]interface{}
  515. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  516. log.Info("解析文档失败", zap.Error(err))
  517. continue
  518. }
  519. projectNameValue := util.ObjToString(doc["projectname"])
  520. if projectNameValue == "" {
  521. continue
  522. }
  523. projectCode := util.ObjToString(doc["projectcode"])
  524. if seenProjectCodes[projectCode] {
  525. continue
  526. }
  527. seenProjectCodes[projectCode] = true
  528. bidamount := util.Float64All(doc["bidamount"])
  529. if bidamountMap[bidamount] {
  530. continue
  531. }
  532. bidamountMap[bidamount] = true
  533. // 相似度筛选
  534. score := *hit.Score
  535. doc["score"] = score //相似度
  536. if score < scoreThreshold {
  537. continue
  538. }
  539. //id := util.ObjToString(doc["id"])
  540. //doc["jyhref"] = GetJyURLByID(id)
  541. //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" {
  542. // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  543. //}
  544. // enrich: total_investment
  545. //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil {
  546. // if util.Float64All((*bidData)["total_investment"]) > 0 {
  547. // doc["total_investment"] = (*bidData)["total_investment"]
  548. // }
  549. //}
  550. doc["score"] = score
  551. detail := util.ObjToString(doc["detail"])
  552. // 字段中必须包含 projectName
  553. if buyer2 != "" {
  554. if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) {
  555. continue
  556. }
  557. }
  558. if seenProjectNames[projectNameValue] {
  559. continue
  560. }
  561. seenProjectNames[projectNameValue] = true
  562. results = append(results, doc)
  563. if len(results) >= maxResults {
  564. break
  565. }
  566. }
  567. return results, nil
  568. }
  569. func searchES(client *elastic.Client, projectName, buyer2 string) ([]map[string]interface{}, error) {
  570. query := elastic.NewBoolQuery().
  571. Must(
  572. //elastic.NewMatchQuery("projectname.pname", projectName), // 模糊匹配 projectname
  573. //elastic.NewMatchQuery("title", projectName), // 模糊匹配 projectname
  574. elastic.NewMatchQuery("detail", projectName), // 模糊匹配 projectname
  575. //elastic.NewTermQuery("area", "安徽"), // 过滤区域
  576. elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一"), // 过滤 subtype
  577. //elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向"), // 过滤 subtype
  578. //elastic.NewTermsQuery("toptype", "拟建"), // 过滤 subtype
  579. )
  580. searchResult, err := client.Search().
  581. Index("bidding").
  582. Query(query).
  583. Size(70). // 先取 12 条,确保足够数据
  584. Do(context.Background())
  585. if err != nil {
  586. return nil, err
  587. }
  588. // 结果集
  589. var results []map[string]interface{}
  590. seenProjectNames := make(map[string]bool) // 用于去重
  591. seenProjectCode := make(map[string]bool) // 用于去重
  592. bidamountMap := make(map[float64]bool)
  593. for _, hit := range searchResult.Hits.Hits {
  594. var doc map[string]interface{}
  595. err := json.Unmarshal(hit.Source, &doc)
  596. if err != nil {
  597. log.Info("解析文档失败", zap.Error(err))
  598. continue
  599. }
  600. // 获取 `projectname`,防止 key 不存在时的错误
  601. projectNameValue, ok := doc["projectname"].(string)
  602. bidamount := util.Float64All(doc["bidamount"])
  603. if !ok {
  604. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  605. continue
  606. }
  607. projectCodeValue := util.ObjToString(doc["projectcode"])
  608. if seenProjectCode[projectCodeValue] {
  609. continue
  610. }
  611. if projectCodeValue != "" {
  612. seenProjectCode[projectCodeValue] = true
  613. }
  614. // **处理额外字段**
  615. id := util.ObjToString(doc["id"])
  616. bidData, _ := MgoB.FindById("bidding", id, nil)
  617. if util.Float64All((*bidData)["total_investment"]) > 0 {
  618. doc["total_investment"] = (*bidData)["total_investment"]
  619. }
  620. doc["jyhref"] = GetJyURLByID(id)
  621. score := *hit.Score
  622. site := util.ObjToString(doc["site"])
  623. if site == "中华人民共和国自然资源部" {
  624. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  625. }
  626. doc["score"] = score //相似度
  627. detail := util.ObjToString(doc["detail"])
  628. if !strings.Contains(detail, projectName) {
  629. continue
  630. }
  631. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  632. if seenProjectNames[projectNameValue] {
  633. continue
  634. }
  635. if bidamountMap[bidamount] {
  636. continue
  637. }
  638. // **记录该 `projectname`,避免重复**
  639. seenProjectNames[projectNameValue] = true
  640. bidamountMap[bidamount] = true
  641. // **加入结果集**
  642. results = append(results, doc)
  643. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  644. if len(results) >= 10 {
  645. break
  646. }
  647. }
  648. //2、判断正文包含采购单位
  649. for _, hit := range searchResult.Hits.Hits {
  650. var doc map[string]interface{}
  651. err := json.Unmarshal(hit.Source, &doc)
  652. if err != nil {
  653. log.Info("解析文档失败:", zap.Error(err))
  654. continue
  655. }
  656. // 获取 `projectname`,防止 key 不存在时的错误
  657. projectNameValue, ok := doc["projectname"].(string)
  658. bidamount := util.Float64All(doc["bidamount"])
  659. if !ok {
  660. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  661. continue
  662. }
  663. // **处理额外字段**
  664. id := util.ObjToString(doc["id"])
  665. doc["jyhref"] = GetJyURLByID(id)
  666. score := *hit.Score
  667. doc["score"] = score //相似度
  668. site := util.ObjToString(doc["site"])
  669. if site == "中华人民共和国自然资源部" {
  670. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  671. }
  672. //判断正文包含采购单位
  673. detail := util.ObjToString(doc["detail"])
  674. if !strings.Contains(detail, buyer2) {
  675. continue
  676. }
  677. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  678. if seenProjectNames[projectNameValue] {
  679. continue
  680. }
  681. if bidamountMap[bidamount] {
  682. continue
  683. }
  684. // **记录该 `projectname`,避免重复**
  685. seenProjectNames[projectNameValue] = true
  686. bidamountMap[bidamount] = true
  687. // **加入结果集**
  688. results = append(results, doc)
  689. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  690. if len(results) >= 10 {
  691. break
  692. }
  693. }
  694. return results, nil
  695. }
  696. // GetJyURLByID 获取剑鱼地址
  697. func GetJyURLByID(id string) string {
  698. var Url = "https://www.jianyu360.com/article/content/%s.html"
  699. url := fmt.Sprintf(Url, util.CommonEncodeArticle("content", id))
  700. return url
  701. }
  702. // GetIdByURL 解密url,获取bidding ID
  703. func GetIdByURL(url string) string {
  704. if strings.Contains(url, "work-bench") {
  705. return ""
  706. }
  707. if strings.Contains(url, "/article/content") {
  708. urls := strings.Split(url, "content/")
  709. res := strings.Split(urls[1], ".html")
  710. ids := util.CommonDecodeArticle("content", res[0])
  711. return ids[0]
  712. }
  713. if strings.HasSuffix(url, "appid") {
  714. urls := strings.Split(url, "entservice/")
  715. res := strings.Split(urls[1], ".html")
  716. se := util.SimpleEncrypt{Key: "entservice"}
  717. id := se.DecodeString(res[0])
  718. return id
  719. }
  720. return ""
  721. }