project.go 27 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "regexp"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. func dealXlsxTest() {
  18. // 1. 初始化 ES 客户端
  19. client, err := elastic.NewClient(
  20. elastic.SetURL(GF.Es.URL),
  21. elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
  22. elastic.SetSniff(false),
  23. )
  24. if err != nil {
  25. log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
  26. }
  27. // 2. 初始化 MongoDB 连接
  28. sess := MgoP.GetMgoConn()
  29. defer MgoP.DestoryMongoConn(sess)
  30. coll := sess.DB("qfw").C("wcc_dealXlsxData_0524")
  31. iter := coll.Find(nil).Select(nil).Iter()
  32. // 3. 并发控制
  33. const maxWorkers = 2
  34. taskCh := make(chan map[string]interface{}, 2000)
  35. var wg sync.WaitGroup
  36. // 4. 启动 worker 处理任务
  37. for i := 0; i < maxWorkers; i++ {
  38. wg.Add(1)
  39. go func() {
  40. defer wg.Done()
  41. for doc := range taskCh {
  42. if len(doc) == 0 {
  43. log.Info("aaa", zap.Any("client", client))
  44. }
  45. processOneProposedTest(doc, client)
  46. }
  47. }()
  48. }
  49. // 5. 逐条读取数据并派发任务
  50. log.Info("111111", zap.String("222222", "开始处理数据"))
  51. count := 0
  52. for doc := make(map[string]interface{}); iter.Next(doc); {
  53. count++
  54. if count%1000 == 0 {
  55. log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["p1_project_name"]))
  56. }
  57. taskCh <- cloneMap(doc) // 防止 map 重用
  58. }
  59. close(taskCh)
  60. wg.Wait()
  61. }
  62. func processOneProposedTest(tmp map[string]interface{}, client *elastic.Client) {
  63. defer func() {
  64. if r := recover(); r != nil {
  65. log.Warn("panic in processOneProposed", zap.Any("recover", r))
  66. }
  67. }()
  68. id := mongodb.BsonIdToSId(tmp["_id"])
  69. //proposedID := mongodb.BsonIdToSId(tmp["_id"])
  70. projectName := util.ObjToString(tmp["p1_project_name"])
  71. buyer := util.ObjToString(tmp["p1_project_owner"])
  72. //proposed_number := util.ObjToString(tmp["proposed_number"])
  73. //log.Info("processOneProposed", zap.String("开始查询es", projectName))
  74. results, err := searchES23(client, projectName, buyer, 20, 50)
  75. if err != nil {
  76. log.Warn("searchES22 error", zap.Error(err))
  77. return
  78. }
  79. //log.Info("processOneProposed", zap.String("结束查询es", projectName))
  80. biddings := []map[string]interface{}{}
  81. update := map[string]interface{}{}
  82. // 标讯信息
  83. for _, re := range results {
  84. biddingID := util.ObjToString(re["id"])
  85. da := map[string]interface{}{
  86. "id": re["id"],
  87. "title": re["title"],
  88. "area": re["area"],
  89. "city": re["city"],
  90. "projectname": re["projectname"],
  91. "score": re["score"],
  92. "toptype": re["toptype"],
  93. "subtype": re["subtype"],
  94. "buyer": re["buyer"],
  95. "budget": re["budget"],
  96. "buyerperson": re["buyerperson"],
  97. "buyertel": re["buyertel"],
  98. "s_winner": re["s_winner"],
  99. "bidamount": re["bidamount"],
  100. "winnertel": re["winnertel"],
  101. "agency": re["agency"],
  102. "publishtime": re["publishtime"],
  103. }
  104. //项目信息
  105. where2 := map[string]interface{}{"ids": biddingID}
  106. if util.ObjToString(re["toptype"]) == "拟建" {
  107. projectset, _ := MgoP.FindOne("projectset_proposed", where2)
  108. if projectset != nil && len((*projectset)) > 0 {
  109. v3 := map[string]interface{}{
  110. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  111. "projectname": (*projectset)["projectname"],
  112. "bidamount": (*projectset)["bidamount"],
  113. "area": (*projectset)["area"],
  114. "city": (*projectset)["city"],
  115. "district": (*projectset)["district"],
  116. "owner": (*projectset)["owner"],
  117. "approvecode": (*projectset)["approvecode"],
  118. }
  119. if (*projectset)["owner"] != "" {
  120. where11 := map[string]interface{}{
  121. "company_name": (*projectset)["owner"],
  122. }
  123. std, _ := MgoQY.FindOne("qyxy_std", where11)
  124. v3["credit_no"] = (*std)["credit_no"]
  125. }
  126. da["project"] = v3
  127. }
  128. biddings = append(biddings, da)
  129. } else {
  130. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  131. if projectset != nil && len((*projectset)) > 0 {
  132. v3 := map[string]interface{}{
  133. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  134. "projectname": (*projectset)["projectname"],
  135. "bidamount": (*projectset)["bidamount"],
  136. "area": (*projectset)["area"],
  137. "city": (*projectset)["city"],
  138. "district": (*projectset)["district"],
  139. "firsttime": (*projectset)["firsttime"],
  140. "bidtype": (*projectset)["bidtype"],
  141. "bidstatus": (*projectset)["bidstatus"],
  142. "sortprice": (*projectset)["sortprice"],
  143. "buyer": (*projectset)["buyer"],
  144. }
  145. if (*projectset)["buyer"] != "" {
  146. where11 := map[string]interface{}{
  147. "company_name": (*projectset)["buyer"],
  148. }
  149. std, _ := MgoQY.FindOne("qyxy_std", where11)
  150. v3["credit_no"] = (*std)["credit_no"]
  151. }
  152. da["project"] = v3
  153. }
  154. biddings = append(biddings, da)
  155. }
  156. }
  157. if len(biddings) > 0 {
  158. update["bidding"] = biddings
  159. MgoP.UpdateById("wcc_dealXlsxData_0524", id, map[string]interface{}{"$set": update})
  160. }
  161. }
  162. // dealProposed22Concurrent 多协程处理,拟建存量数据
  163. func dealProposed22Concurrent() {
  164. // 1. 初始化 ES 客户端
  165. client, err := elastic.NewClient(
  166. elastic.SetURL(GF.Es.URL),
  167. elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
  168. elastic.SetSniff(false),
  169. )
  170. if err != nil {
  171. log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
  172. }
  173. // 2. 初始化 MongoDB 连接
  174. sess := MgoP.GetMgoConn()
  175. defer MgoP.DestoryMongoConn(sess)
  176. coll := sess.DB("qfw").C("projectset_proposed")
  177. query := map[string]interface{}{
  178. //"firsttime": map[string]interface{}{
  179. // "$gte": 1735660800,
  180. // "$lte": 1748102400,
  181. //},
  182. "firsttime": map[string]interface{}{
  183. "$lte": 1735660800,
  184. },
  185. }
  186. iter := coll.Find(query).Select(nil).Sort("-_id").Iter()
  187. // 3. 并发控制
  188. const maxWorkers = 1
  189. taskCh := make(chan map[string]interface{}, 2000)
  190. var wg sync.WaitGroup
  191. // 4. 启动 worker 处理任务
  192. for i := 0; i < maxWorkers; i++ {
  193. wg.Add(1)
  194. go func() {
  195. defer wg.Done()
  196. for doc := range taskCh {
  197. if len(doc) == 0 {
  198. log.Info("aaa", zap.Any("client", client))
  199. }
  200. processOneProposed(doc, client)
  201. }
  202. }()
  203. }
  204. // 5. 逐条读取数据并派发任务
  205. log.Info("111111", zap.String("222222", "开始处理数据"))
  206. count := 0
  207. for doc := make(map[string]interface{}); iter.Next(doc); {
  208. count++
  209. if count%1000 == 0 {
  210. log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]), zap.Any("_id", doc["_id"]))
  211. }
  212. //if util.ObjToString(doc["area"]) == "甘肃" {
  213. // continue
  214. //}
  215. taskCh <- cloneMap(doc) // 防止 map 重用
  216. }
  217. close(taskCh)
  218. wg.Wait()
  219. }
  220. // processOneProposed 处理存量数据
  221. func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
  222. defer func() {
  223. if r := recover(); r != nil {
  224. log.Warn("panic in processOneProposed", zap.Any("recover", r))
  225. }
  226. }()
  227. proposedID := mongodb.BsonIdToSId(tmp["_id"])
  228. projectName := util.ObjToString(tmp["projectname"])
  229. buyer := util.ObjToString(tmp["owner"])
  230. proposed_number := util.ObjToString(tmp["proposed_number"])
  231. //log.Info("processOneProposed", zap.String("开始查询es", projectName))
  232. results, err := searchES23(client, projectName, buyer, 20, 50)
  233. if err != nil {
  234. log.Warn("searchES22 error", zap.Error(err))
  235. return
  236. }
  237. //log.Info("processOneProposed", zap.String("结束查询es", projectName))
  238. biddings := []map[string]interface{}{}
  239. // 标讯信息
  240. for _, re := range results {
  241. biddingID := util.ObjToString(re["id"])
  242. da := map[string]interface{}{
  243. "id": re["id"],
  244. "title": re["title"],
  245. "area": re["area"],
  246. "city": re["city"],
  247. "projectname": re["projectname"],
  248. "score": re["score"],
  249. "toptype": re["toptype"],
  250. "subtype": re["subtype"],
  251. "buyer": re["buyer"],
  252. "budget": re["budget"],
  253. "buyerperson": re["buyerperson"],
  254. "buyertel": re["buyertel"],
  255. "s_winner": re["s_winner"],
  256. "bidamount": re["bidamount"],
  257. "winnertel": re["winnertel"],
  258. "agency": re["agency"],
  259. "publishtime": re["publishtime"],
  260. }
  261. //项目信息
  262. where2 := map[string]interface{}{"ids": biddingID}
  263. if util.ObjToString(re["toptype"]) == "拟建" {
  264. projectset, _ := MgoP.FindOne("projectset_proposed", where2)
  265. if projectset != nil && len((*projectset)) > 0 {
  266. v3 := map[string]interface{}{
  267. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  268. "projectname": (*projectset)["projectname"],
  269. "bidamount": (*projectset)["bidamount"],
  270. "area": (*projectset)["area"],
  271. "city": (*projectset)["city"],
  272. "district": (*projectset)["district"],
  273. "owner": (*projectset)["owner"],
  274. "approvecode": (*projectset)["approvecode"],
  275. "approvestatus": (*projectset)["approvestatus"],
  276. "sourceinfourl": (*projectset)["sourceinfourl"],
  277. }
  278. if (*projectset)["owner"] != "" {
  279. where11 := map[string]interface{}{
  280. "company_name": (*projectset)["owner"],
  281. }
  282. std, _ := MgoQY.FindOne("qyxy_std", where11)
  283. v3["credit_no"] = (*std)["credit_no"]
  284. }
  285. da["project"] = v3
  286. }
  287. biddings = append(biddings, da)
  288. } else {
  289. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  290. if projectset != nil && len((*projectset)) > 0 {
  291. v3 := map[string]interface{}{
  292. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  293. "projectname": (*projectset)["projectname"],
  294. "bidamount": (*projectset)["bidamount"],
  295. "area": (*projectset)["area"],
  296. "city": (*projectset)["city"],
  297. "firsttime": (*projectset)["firsttime"],
  298. "bidtype": (*projectset)["bidtype"],
  299. "bidstatus": (*projectset)["bidstatus"],
  300. "sortprice": (*projectset)["sortprice"],
  301. "buyer": (*projectset)["buyer"],
  302. }
  303. da["project"] = v3
  304. }
  305. biddings = append(biddings, da)
  306. }
  307. }
  308. insert := map[string]interface{}{
  309. "proposed_id": proposedID,
  310. "stype": 1, //代表从拟建数据-> 匹配在建数据
  311. "proposed_number": proposed_number,
  312. "buyer": buyer,
  313. "projectname": tmp["projectname"],
  314. "area": tmp["area"],
  315. "city": tmp["city"],
  316. "district": tmp["district"],
  317. "bidding": biddings,
  318. "updatetime": time.Now().Unix(),
  319. }
  320. if isValidCodeFormat(util.ObjToString(tmp["approvecode"])) {
  321. insert["approvecode"] = tmp["approvecode"]
  322. }
  323. if buyer != "" {
  324. where11 := map[string]interface{}{
  325. "company_name": buyer,
  326. }
  327. std, _ := MgoQY.FindOne("qyxy_std", where11)
  328. insert["credit_no"] = (*std)["credit_no"]
  329. }
  330. whereExist := map[string]interface{}{
  331. "proposed_id": proposedID,
  332. }
  333. if GF.Env.Savecoll == "" {
  334. GF.Env.Savecoll = "wcc_nj_zj_bidding"
  335. }
  336. exist, _ := MgoP.FindOne(GF.Env.Savecoll, whereExist)
  337. // 存在就更新
  338. if exist != nil && len(*exist) > 0 {
  339. exitsid := mongodb.BsonIdToSId((*exist)["_id"])
  340. MgoP.UpdateById(GF.Env.Savecoll, exitsid, map[string]interface{}{"$set": insert})
  341. } else {
  342. insert["comeintime"] = time.Now().Unix()
  343. MgoP.InsertOrUpdate("qfw", GF.Env.Savecoll, insert)
  344. }
  345. }
  346. func cloneMap(src map[string]interface{}) map[string]interface{} {
  347. dst := make(map[string]interface{}, len(src))
  348. for k, v := range src {
  349. dst[k] = v
  350. }
  351. return dst
  352. }
  353. func dealProposed22() {
  354. url := GF.Es.URL
  355. //url := "http://127.0.0.1:19908"
  356. username := GF.Es.Username
  357. password := GF.Es.Password
  358. //index := "bidding" //索引名称
  359. // 创建 Elasticsearch 客户端
  360. client, err := elastic.NewClient(
  361. elastic.SetURL(url),
  362. elastic.SetBasicAuth(username, password),
  363. elastic.SetSniff(false),
  364. )
  365. if err != nil {
  366. log.Info("创建 Elasticsearch 客户端失败", zap.Error(err))
  367. }
  368. //
  369. sess := MgoP.GetMgoConn()
  370. defer MgoP.DestoryMongoConn(sess)
  371. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  372. where := map[string]interface{}{
  373. "firsttime": map[string]interface{}{
  374. "$gte": 1735660800,
  375. },
  376. }
  377. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  378. count := 0
  379. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  380. if count%1000 == 0 {
  381. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  382. }
  383. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  384. project_name := util.ObjToString(tmp["projectname"])
  385. buyer := util.ObjToString(tmp["owner"])
  386. results, err := searchES22(client, project_name, buyer, 60, 10)
  387. if err != nil {
  388. log.Info("searchES22", zap.Error(err))
  389. }
  390. projectIds := make([]string, 0) //拟建对应的在建项目ID
  391. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  392. biddings := make([]map[string]interface{}, 0)
  393. for _, re := range results {
  394. bidding_id := util.ObjToString(re["id"])
  395. biddingIds = append(biddingIds, bidding_id)
  396. bidding := map[string]interface{}{
  397. "id": re["id"],
  398. "title": re["title"],
  399. "projectname": re["projectname"],
  400. "score": re["score"],
  401. "toptype": re["toptype"],
  402. "subtype": re["subtype"],
  403. }
  404. biddings = append(biddings, bidding)
  405. }
  406. for _, bid := range biddingIds {
  407. where2 := map[string]interface{}{
  408. "ids": bid,
  409. }
  410. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  411. if projectset != nil && len((*projectset)) > 0 {
  412. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  413. }
  414. }
  415. insert := map[string]interface{}{
  416. "proposed_id": proposed_id,
  417. "bidding_ids": removeDuplicates(biddingIds),
  418. "project_ids": removeDuplicates(projectIds),
  419. "biddings": biddings,
  420. "project_name": tmp["projectname"],
  421. }
  422. MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22", insert)
  423. }
  424. }
  425. // dealProposed 处理拟建数据表
  426. func dealProposed() {
  427. sess := MgoP.GetMgoConn()
  428. defer MgoP.DestoryMongoConn(sess)
  429. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  430. where := map[string]interface{}{
  431. "firsttime": map[string]interface{}{
  432. "$gte": 1735660800,
  433. },
  434. }
  435. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  436. count := 0
  437. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  438. if count%1000 == 0 {
  439. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  440. }
  441. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  442. insert := make(map[string]interface{})
  443. insert["proposed_id"] = proposed_id
  444. var nzj_follw_records = make([]DwdFnzjFollowRecord, 0)
  445. err := JianyuSubjectDB.Where("proposed_id = ? ", proposed_id).Find(&nzj_follw_records).Error
  446. if err != nil {
  447. log.Info("dealProposed", zap.Error(err))
  448. }
  449. //拟建标讯,没有找到对应的在建项目数据
  450. if len(nzj_follw_records) == 0 {
  451. insert["has_bidding"] = false
  452. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  453. continue
  454. }
  455. projectIds := make([]string, 0) //拟建对应的在建项目ID
  456. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  457. for _, v := range nzj_follw_records {
  458. biddingIds = append(biddingIds, v.InfoID)
  459. }
  460. biddingIds = removeDuplicates(biddingIds)
  461. insert["bidding_ids"] = biddingIds
  462. for _, bid := range biddingIds {
  463. where2 := map[string]interface{}{
  464. "ids": bid,
  465. }
  466. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  467. if projectset != nil && len((*projectset)) > 0 {
  468. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  469. }
  470. }
  471. if len(projectIds) > 0 {
  472. insert["project_ids"] = projectIds
  473. } else {
  474. insert["has_project"] = false
  475. }
  476. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  477. }
  478. log.Info("dealProposed", zap.Any("数据处理完毕:拟建数据表", "projectset_proposed"))
  479. }
  480. // removeDuplicates 去除重复字符串
  481. func removeDuplicates(arr []string) []string {
  482. uniqueMap := make(map[string]bool)
  483. var result []string
  484. for _, str := range arr {
  485. if !uniqueMap[str] {
  486. uniqueMap[str] = true
  487. result = append(result, str)
  488. }
  489. }
  490. return result
  491. }
  492. func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  493. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  494. filtersToTry := [][]elastic.Query{
  495. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  496. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  497. {elastic.NewTermsQuery("toptype", "拟建")},
  498. }
  499. var allResults []*elastic.SearchHit
  500. seenIDs := make(map[string]bool)
  501. for _, field := range fieldsToTry {
  502. for _, filter := range filtersToTry {
  503. // 构建查询:使用 MultiMatchQuery + phrase
  504. query := elastic.NewBoolQuery().
  505. Must(elastic.NewMultiMatchQuery(projectName, field).Type("phrase")).
  506. Filter(filter...)
  507. // 执行查询
  508. searchResult, err := client.Search().
  509. Index("bidding").
  510. Query(query).
  511. Size(70).
  512. Do(context.Background())
  513. if err != nil {
  514. return nil, err
  515. }
  516. // 去重处理
  517. for _, hit := range searchResult.Hits.Hits {
  518. if !seenIDs[hit.Id] {
  519. seenIDs[hit.Id] = true
  520. allResults = append(allResults, hit)
  521. }
  522. }
  523. if len(allResults) >= maxResults {
  524. break
  525. }
  526. }
  527. if len(allResults) >= maxResults {
  528. break
  529. }
  530. }
  531. var results []map[string]interface{}
  532. seenProjectNames := make(map[string]bool)
  533. seenProjectCodes := make(map[string]bool)
  534. bidamountMap := make(map[float64]bool)
  535. //subtypeMap := make(map[string]bool)
  536. for _, hit := range allResults {
  537. var doc map[string]interface{}
  538. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  539. log.Info("解析文档失败", zap.Error(err))
  540. continue
  541. }
  542. projectNameValue := util.ObjToString(doc["projectname"])
  543. if projectNameValue == "" {
  544. continue
  545. }
  546. projectCode := util.ObjToString(doc["projectcode"])
  547. if projectCode != "" {
  548. if seenProjectCodes[projectCode] {
  549. continue
  550. }
  551. seenProjectCodes[projectCode] = true
  552. }
  553. bidamount := util.Float64All(doc["bidamount"])
  554. if bidamount != 0 {
  555. if bidamountMap[bidamount] {
  556. continue
  557. }
  558. bidamountMap[bidamount] = true
  559. }
  560. // 相似度筛选
  561. score := *hit.Score
  562. doc["score"] = score //相似度
  563. if score < scoreThreshold {
  564. continue
  565. }
  566. doc["score"] = score
  567. detail := util.ObjToString(doc["detail"])
  568. // 字段中必须包含 projectName
  569. if buyer2 != "" {
  570. if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) {
  571. continue
  572. }
  573. }
  574. if seenProjectNames[projectNameValue] {
  575. continue
  576. }
  577. seenProjectNames[projectNameValue] = true
  578. results = append(results, doc)
  579. if len(results) >= maxResults {
  580. break
  581. }
  582. }
  583. // 排序:按 score 降序
  584. sort.Slice(results, func(i, j int) bool {
  585. si := util.Float64All(results[i]["score"])
  586. sj := util.Float64All(results[j]["score"])
  587. return si > sj
  588. })
  589. return results, nil
  590. }
  591. func searchES22(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  592. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  593. filtersToTry := [][]elastic.Query{
  594. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  595. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  596. {elastic.NewTermsQuery("toptype", "拟建")},
  597. }
  598. var allResults []*elastic.SearchHit
  599. seenIDs := make(map[string]bool)
  600. for _, field := range fieldsToTry {
  601. for _, filter := range filtersToTry {
  602. // 构建查询
  603. query := elastic.NewBoolQuery().
  604. Must(elastic.NewMatchQuery(field, projectName)).
  605. Filter(filter...)
  606. // 执行查询
  607. searchResult, err := client.Search().
  608. Index("bidding").
  609. Query(query).
  610. Size(70). // 多取一些,后面做筛选和去重
  611. Do(context.Background())
  612. if err != nil {
  613. return nil, err
  614. }
  615. for _, hit := range searchResult.Hits.Hits {
  616. if !seenIDs[hit.Id] {
  617. allResults = append(allResults, hit)
  618. seenIDs[hit.Id] = true
  619. }
  620. }
  621. if len(allResults) >= maxResults {
  622. break
  623. }
  624. }
  625. if len(allResults) >= maxResults {
  626. break
  627. }
  628. }
  629. var results []map[string]interface{}
  630. seenProjectNames := make(map[string]bool)
  631. seenProjectCodes := make(map[string]bool)
  632. bidamountMap := make(map[float64]bool)
  633. for _, hit := range allResults {
  634. var doc map[string]interface{}
  635. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  636. log.Info("解析文档失败", zap.Error(err))
  637. continue
  638. }
  639. projectNameValue := util.ObjToString(doc["projectname"])
  640. if projectNameValue == "" {
  641. continue
  642. }
  643. projectCode := util.ObjToString(doc["projectcode"])
  644. if seenProjectCodes[projectCode] {
  645. continue
  646. }
  647. seenProjectCodes[projectCode] = true
  648. bidamount := util.Float64All(doc["bidamount"])
  649. if bidamountMap[bidamount] {
  650. continue
  651. }
  652. bidamountMap[bidamount] = true
  653. // 相似度筛选
  654. score := *hit.Score
  655. doc["score"] = score //相似度
  656. if score < scoreThreshold {
  657. continue
  658. }
  659. //id := util.ObjToString(doc["id"])
  660. //doc["jyhref"] = GetJyURLByID(id)
  661. //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" {
  662. // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  663. //}
  664. // enrich: total_investment
  665. //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil {
  666. // if util.Float64All((*bidData)["total_investment"]) > 0 {
  667. // doc["total_investment"] = (*bidData)["total_investment"]
  668. // }
  669. //}
  670. doc["score"] = score
  671. detail := util.ObjToString(doc["detail"])
  672. // 字段中必须包含 projectName
  673. if buyer2 != "" {
  674. if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) {
  675. continue
  676. }
  677. }
  678. if seenProjectNames[projectNameValue] {
  679. continue
  680. }
  681. seenProjectNames[projectNameValue] = true
  682. results = append(results, doc)
  683. if len(results) >= maxResults {
  684. break
  685. }
  686. }
  687. return results, nil
  688. }
  689. func searchES(client *elastic.Client, projectName, buyer2 string) ([]map[string]interface{}, error) {
  690. query := elastic.NewBoolQuery().
  691. Must(
  692. //elastic.NewMatchQuery("projectname.pname", projectName), // 模糊匹配 projectname
  693. //elastic.NewMatchQuery("title", projectName), // 模糊匹配 projectname
  694. elastic.NewMatchQuery("detail", projectName), // 模糊匹配 projectname
  695. //elastic.NewTermQuery("area", "安徽"), // 过滤区域
  696. elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一"), // 过滤 subtype
  697. //elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向"), // 过滤 subtype
  698. //elastic.NewTermsQuery("toptype", "拟建"), // 过滤 subtype
  699. )
  700. searchResult, err := client.Search().
  701. Index("bidding").
  702. Query(query).
  703. Size(70). // 先取 12 条,确保足够数据
  704. Do(context.Background())
  705. if err != nil {
  706. return nil, err
  707. }
  708. // 结果集
  709. var results []map[string]interface{}
  710. seenProjectNames := make(map[string]bool) // 用于去重
  711. seenProjectCode := make(map[string]bool) // 用于去重
  712. bidamountMap := make(map[float64]bool)
  713. for _, hit := range searchResult.Hits.Hits {
  714. var doc map[string]interface{}
  715. err := json.Unmarshal(hit.Source, &doc)
  716. if err != nil {
  717. log.Info("解析文档失败", zap.Error(err))
  718. continue
  719. }
  720. // 获取 `projectname`,防止 key 不存在时的错误
  721. projectNameValue, ok := doc["projectname"].(string)
  722. bidamount := util.Float64All(doc["bidamount"])
  723. if !ok {
  724. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  725. continue
  726. }
  727. projectCodeValue := util.ObjToString(doc["projectcode"])
  728. if seenProjectCode[projectCodeValue] {
  729. continue
  730. }
  731. if projectCodeValue != "" {
  732. seenProjectCode[projectCodeValue] = true
  733. }
  734. // **处理额外字段**
  735. id := util.ObjToString(doc["id"])
  736. bidData, _ := MgoB.FindById("bidding", id, nil)
  737. if util.Float64All((*bidData)["total_investment"]) > 0 {
  738. doc["total_investment"] = (*bidData)["total_investment"]
  739. }
  740. doc["jyhref"] = GetJyURLByID(id)
  741. score := *hit.Score
  742. site := util.ObjToString(doc["site"])
  743. if site == "中华人民共和国自然资源部" {
  744. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  745. }
  746. doc["score"] = score //相似度
  747. detail := util.ObjToString(doc["detail"])
  748. if !strings.Contains(detail, projectName) {
  749. continue
  750. }
  751. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  752. if seenProjectNames[projectNameValue] {
  753. continue
  754. }
  755. if bidamountMap[bidamount] {
  756. continue
  757. }
  758. // **记录该 `projectname`,避免重复**
  759. seenProjectNames[projectNameValue] = true
  760. bidamountMap[bidamount] = true
  761. // **加入结果集**
  762. results = append(results, doc)
  763. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  764. if len(results) >= 10 {
  765. break
  766. }
  767. }
  768. //2、判断正文包含采购单位
  769. for _, hit := range searchResult.Hits.Hits {
  770. var doc map[string]interface{}
  771. err := json.Unmarshal(hit.Source, &doc)
  772. if err != nil {
  773. log.Info("解析文档失败:", zap.Error(err))
  774. continue
  775. }
  776. // 获取 `projectname`,防止 key 不存在时的错误
  777. projectNameValue, ok := doc["projectname"].(string)
  778. bidamount := util.Float64All(doc["bidamount"])
  779. if !ok {
  780. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  781. continue
  782. }
  783. // **处理额外字段**
  784. id := util.ObjToString(doc["id"])
  785. doc["jyhref"] = GetJyURLByID(id)
  786. score := *hit.Score
  787. doc["score"] = score //相似度
  788. site := util.ObjToString(doc["site"])
  789. if site == "中华人民共和国自然资源部" {
  790. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  791. }
  792. //判断正文包含采购单位
  793. detail := util.ObjToString(doc["detail"])
  794. if !strings.Contains(detail, buyer2) {
  795. continue
  796. }
  797. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  798. if seenProjectNames[projectNameValue] {
  799. continue
  800. }
  801. if bidamountMap[bidamount] {
  802. continue
  803. }
  804. // **记录该 `projectname`,避免重复**
  805. seenProjectNames[projectNameValue] = true
  806. bidamountMap[bidamount] = true
  807. // **加入结果集**
  808. results = append(results, doc)
  809. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  810. if len(results) >= 10 {
  811. break
  812. }
  813. }
  814. return results, nil
  815. }
  816. // GetJyURLByID 获取剑鱼地址
  817. func GetJyURLByID(id string) string {
  818. var Url = "https://www.jianyu360.com/article/content/%s.html"
  819. url := fmt.Sprintf(Url, util.CommonEncodeArticle("content", id))
  820. return url
  821. }
  822. // GetIdByURL 解密url,获取bidding ID
  823. func GetIdByURL(url string) string {
  824. if strings.Contains(url, "work-bench") {
  825. return ""
  826. }
  827. if strings.Contains(url, "/article/content") {
  828. urls := strings.Split(url, "content/")
  829. res := strings.Split(urls[1], ".html")
  830. ids := util.CommonDecodeArticle("content", res[0])
  831. return ids[0]
  832. }
  833. if strings.HasSuffix(url, "appid") {
  834. urls := strings.Split(url, "entservice/")
  835. res := strings.Split(urls[1], ".html")
  836. se := util.SimpleEncrypt{Key: "entservice"}
  837. id := se.DecodeString(res[0])
  838. return id
  839. }
  840. return ""
  841. }
  842. // isValidCodeFormat 判断 拟建项目编码
  843. func isValidCodeFormat(s string) bool {
  844. pattern := `^\d{4}-\d{6}-\d{2}-\d{2}-\d{6}$`
  845. matched, err := regexp.MatchString(pattern, s)
  846. if err != nil {
  847. return false
  848. }
  849. return matched
  850. }