project.go 32 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "go.uber.org/zap"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "regexp"
  12. "sort"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. func dealXlsxTest() {
  18. // 1. 初始化 ES 客户端
  19. client, err := elastic.NewClient(
  20. elastic.SetURL(GF.Es.URL),
  21. elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
  22. elastic.SetSniff(false),
  23. )
  24. if err != nil {
  25. log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
  26. }
  27. // 2. 初始化 MongoDB 连接
  28. sess := MgoP.GetMgoConn()
  29. defer MgoP.DestoryMongoConn(sess)
  30. coll := sess.DB("qfw").C("wcc_dealXlsxData_0524")
  31. iter := coll.Find(nil).Select(nil).Iter()
  32. // 3. 并发控制
  33. const maxWorkers = 2
  34. taskCh := make(chan map[string]interface{}, 2000)
  35. var wg sync.WaitGroup
  36. // 4. 启动 worker 处理任务
  37. for i := 0; i < maxWorkers; i++ {
  38. wg.Add(1)
  39. go func() {
  40. defer wg.Done()
  41. for doc := range taskCh {
  42. if len(doc) == 0 {
  43. log.Info("aaa", zap.Any("client", client))
  44. }
  45. processOneProposedTest(doc, client)
  46. }
  47. }()
  48. }
  49. // 5. 逐条读取数据并派发任务
  50. log.Info("111111", zap.String("222222", "开始处理数据"))
  51. count := 0
  52. for doc := make(map[string]interface{}); iter.Next(doc); {
  53. count++
  54. if count%1000 == 0 {
  55. log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["p1_project_name"]))
  56. }
  57. taskCh <- cloneMap(doc) // 防止 map 重用
  58. }
  59. close(taskCh)
  60. wg.Wait()
  61. }
  62. func processOneProposedTest(tmp map[string]interface{}, client *elastic.Client) {
  63. defer func() {
  64. if r := recover(); r != nil {
  65. log.Warn("panic in processOneProposed", zap.Any("recover", r))
  66. }
  67. }()
  68. id := mongodb.BsonIdToSId(tmp["_id"])
  69. //proposedID := mongodb.BsonIdToSId(tmp["_id"])
  70. projectName := util.ObjToString(tmp["p1_project_name"])
  71. buyer := util.ObjToString(tmp["p1_project_owner"])
  72. //proposed_number := util.ObjToString(tmp["proposed_number"])
  73. //log.Info("processOneProposed", zap.String("开始查询es", projectName))
  74. results, err := searchES23(client, projectName, buyer, 20, 50)
  75. if err != nil {
  76. log.Warn("searchES22 error", zap.Error(err))
  77. return
  78. }
  79. //log.Info("processOneProposed", zap.String("结束查询es", projectName))
  80. biddings := []map[string]interface{}{}
  81. update := map[string]interface{}{}
  82. // 标讯信息
  83. for _, re := range results {
  84. biddingID := util.ObjToString(re["id"])
  85. da := map[string]interface{}{
  86. "id": re["id"],
  87. "title": re["title"],
  88. "area": re["area"],
  89. "city": re["city"],
  90. "projectname": re["projectname"],
  91. "score": re["score"],
  92. "toptype": re["toptype"],
  93. "subtype": re["subtype"],
  94. "buyer": re["buyer"],
  95. "budget": re["budget"],
  96. "buyerperson": re["buyerperson"],
  97. "buyertel": re["buyertel"],
  98. "s_winner": re["s_winner"],
  99. "bidamount": re["bidamount"],
  100. "winnertel": re["winnertel"],
  101. "agency": re["agency"],
  102. "publishtime": re["publishtime"],
  103. }
  104. //项目信息
  105. where2 := map[string]interface{}{"ids": biddingID}
  106. if util.ObjToString(re["toptype"]) == "拟建" {
  107. projectset, _ := MgoP.FindOne("projectset_proposed", where2)
  108. if projectset != nil && len((*projectset)) > 0 {
  109. v3 := map[string]interface{}{
  110. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  111. "projectname": (*projectset)["projectname"],
  112. "bidamount": (*projectset)["bidamount"],
  113. "area": (*projectset)["area"],
  114. "city": (*projectset)["city"],
  115. "district": (*projectset)["district"],
  116. "owner": (*projectset)["owner"],
  117. "approvecode": (*projectset)["approvecode"],
  118. }
  119. if (*projectset)["owner"] != "" {
  120. where11 := map[string]interface{}{
  121. "company_name": (*projectset)["owner"],
  122. }
  123. std, _ := MgoQY.FindOne("qyxy_std", where11)
  124. v3["credit_no"] = (*std)["credit_no"]
  125. }
  126. da["project"] = v3
  127. }
  128. biddings = append(biddings, da)
  129. } else {
  130. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  131. if projectset != nil && len((*projectset)) > 0 {
  132. v3 := map[string]interface{}{
  133. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  134. "projectname": (*projectset)["projectname"],
  135. "bidamount": (*projectset)["bidamount"],
  136. "area": (*projectset)["area"],
  137. "city": (*projectset)["city"],
  138. "district": (*projectset)["district"],
  139. "firsttime": (*projectset)["firsttime"],
  140. "bidtype": (*projectset)["bidtype"],
  141. "bidstatus": (*projectset)["bidstatus"],
  142. "sortprice": (*projectset)["sortprice"],
  143. "buyer": (*projectset)["buyer"],
  144. }
  145. if (*projectset)["buyer"] != "" {
  146. where11 := map[string]interface{}{
  147. "company_name": (*projectset)["buyer"],
  148. }
  149. std, _ := MgoQY.FindOne("qyxy_std", where11)
  150. v3["credit_no"] = (*std)["credit_no"]
  151. }
  152. da["project"] = v3
  153. }
  154. biddings = append(biddings, da)
  155. }
  156. }
  157. if len(biddings) > 0 {
  158. update["bidding"] = biddings
  159. MgoP.UpdateById("wcc_dealXlsxData_0524", id, map[string]interface{}{"$set": update})
  160. }
  161. }
  162. // dealProposed22Concurrent 多协程处理,拟建存量数据
  163. func dealProposed22Concurrent() {
  164. // 1. 初始化 ES 客户端
  165. client, err := elastic.NewClient(
  166. elastic.SetURL(GF.Es.URL),
  167. elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
  168. elastic.SetSniff(false),
  169. )
  170. if err != nil {
  171. log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
  172. }
  173. // 2. 初始化 MongoDB 连接
  174. sess := MgoP.GetMgoConn()
  175. defer MgoP.DestoryMongoConn(sess)
  176. coll := sess.DB("qfw").C("projectset_proposed")
  177. query := map[string]interface{}{
  178. //"firsttime": map[string]interface{}{
  179. // "$gte": 1735660800,
  180. // "$lte": 1748102400,
  181. //},
  182. //"firsttime": map[string]interface{}{
  183. // "$lte": 1735660800,
  184. //},
  185. "_id": map[string]interface{}{
  186. "$lte": mongodb.StringTOBsonId("62b6fbc9fa39106bd5e599fc"),
  187. },
  188. }
  189. iter := coll.Find(query).Select(nil).Sort("-_id").Iter()
  190. // 3. 并发控制
  191. const maxWorkers = 1
  192. taskCh := make(chan map[string]interface{}, 2000)
  193. var wg sync.WaitGroup
  194. // 4. 启动 worker 处理任务
  195. for i := 0; i < maxWorkers; i++ {
  196. wg.Add(1)
  197. go func() {
  198. defer wg.Done()
  199. for doc := range taskCh {
  200. if len(doc) == 0 {
  201. log.Info("aaa", zap.Any("client", client))
  202. }
  203. processOneProposed(doc, client)
  204. }
  205. }()
  206. }
  207. // 5. 逐条读取数据并派发任务
  208. log.Info("111111", zap.String("222222", "开始处理数据"))
  209. count := 0
  210. for doc := make(map[string]interface{}); iter.Next(doc); {
  211. count++
  212. if count%1000 == 0 {
  213. log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]), zap.Any("_id", doc["_id"]))
  214. }
  215. //if util.ObjToString(doc["area"]) == "甘肃" {
  216. // continue
  217. //}
  218. taskCh <- cloneMap(doc) // 防止 map 重用
  219. }
  220. close(taskCh)
  221. wg.Wait()
  222. }
  223. // processOneProposed 处理存量数据
  224. func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
  225. defer func() {
  226. if r := recover(); r != nil {
  227. log.Warn("panic in processOneProposed", zap.Any("recover", r))
  228. }
  229. }()
  230. proposedID := mongodb.BsonIdToSId(tmp["_id"])
  231. projectName := util.ObjToString(tmp["projectname"])
  232. buyer := util.ObjToString(tmp["owner"])
  233. proposed_number := util.ObjToString(tmp["proposed_number"])
  234. //log.Info("processOneProposed", zap.String("开始查询es", projectName))
  235. results, err := searchES23(client, projectName, buyer, 20, 50)
  236. if err != nil {
  237. log.Warn("searchES22 error", zap.Error(err))
  238. return
  239. }
  240. //log.Info("processOneProposed", zap.String("结束查询es", projectName))
  241. biddings := []map[string]interface{}{}
  242. // 标讯信息
  243. for _, re := range results {
  244. biddingID := util.ObjToString(re["id"])
  245. da := map[string]interface{}{
  246. "id": re["id"],
  247. "title": re["title"],
  248. "area": re["area"],
  249. "city": re["city"],
  250. "projectname": re["projectname"],
  251. "score": re["score"],
  252. "toptype": re["toptype"],
  253. "subtype": re["subtype"],
  254. "buyer": re["buyer"],
  255. "budget": re["budget"],
  256. "buyerperson": re["buyerperson"],
  257. "buyertel": re["buyertel"],
  258. "s_winner": re["s_winner"],
  259. "bidamount": re["bidamount"],
  260. "winnertel": re["winnertel"],
  261. "agency": re["agency"],
  262. "publishtime": re["publishtime"],
  263. }
  264. //项目信息
  265. where2 := map[string]interface{}{"ids": biddingID}
  266. if util.ObjToString(re["toptype"]) == "拟建" {
  267. projectset, _ := MgoP.FindOne("projectset_proposed", where2)
  268. if projectset != nil && len((*projectset)) > 0 {
  269. v3 := map[string]interface{}{
  270. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  271. "projectname": (*projectset)["projectname"],
  272. "bidamount": (*projectset)["bidamount"],
  273. "area": (*projectset)["area"],
  274. "city": (*projectset)["city"],
  275. "district": (*projectset)["district"],
  276. "owner": (*projectset)["owner"],
  277. "approvecode": (*projectset)["approvecode"],
  278. "approvestatus": (*projectset)["approvestatus"],
  279. "sourceinfourl": (*projectset)["sourceinfourl"],
  280. }
  281. if (*projectset)["owner"] != "" {
  282. where11 := map[string]interface{}{
  283. "company_name": (*projectset)["owner"],
  284. }
  285. std, _ := MgoQY.FindOne("qyxy_std", where11)
  286. v3["credit_no"] = (*std)["credit_no"]
  287. }
  288. da["project"] = v3
  289. }
  290. biddings = append(biddings, da)
  291. } else {
  292. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  293. if projectset != nil && len((*projectset)) > 0 {
  294. v3 := map[string]interface{}{
  295. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  296. "projectname": (*projectset)["projectname"],
  297. "bidamount": (*projectset)["bidamount"],
  298. "area": (*projectset)["area"],
  299. "city": (*projectset)["city"],
  300. "firsttime": (*projectset)["firsttime"],
  301. "bidtype": (*projectset)["bidtype"],
  302. "bidstatus": (*projectset)["bidstatus"],
  303. "sortprice": (*projectset)["sortprice"],
  304. "buyer": (*projectset)["buyer"],
  305. }
  306. da["project"] = v3
  307. }
  308. biddings = append(biddings, da)
  309. }
  310. }
  311. insert := map[string]interface{}{
  312. "proposed_id": proposedID,
  313. "stype": 1, //代表从拟建数据-> 匹配在建数据
  314. "proposed_number": proposed_number,
  315. "buyer": buyer,
  316. "projectname": tmp["projectname"],
  317. "area": tmp["area"],
  318. "city": tmp["city"],
  319. "district": tmp["district"],
  320. "bidding": biddings,
  321. "updatetime": time.Now().Unix(),
  322. }
  323. if isValidCodeFormat(util.ObjToString(tmp["approvecode"])) {
  324. insert["approvecode"] = tmp["approvecode"]
  325. }
  326. if buyer != "" {
  327. where11 := map[string]interface{}{
  328. "company_name": buyer,
  329. }
  330. std, _ := MgoQY.FindOne("qyxy_std", where11)
  331. insert["credit_no"] = (*std)["credit_no"]
  332. }
  333. whereExist := map[string]interface{}{
  334. "proposed_id": proposedID,
  335. }
  336. if GF.Env.Savecoll == "" {
  337. GF.Env.Savecoll = "wcc_nj_zj_bidding"
  338. }
  339. exist, _ := MgoP.FindOne(GF.Env.Savecoll, whereExist)
  340. // 存在就更新
  341. if exist != nil && len(*exist) > 0 {
  342. exitsid := mongodb.BsonIdToSId((*exist)["_id"])
  343. MgoP.UpdateById(GF.Env.Savecoll, exitsid, map[string]interface{}{"$set": insert})
  344. } else {
  345. insert["comeintime"] = time.Now().Unix()
  346. MgoP.InsertOrUpdate("qfw", GF.Env.Savecoll, insert)
  347. }
  348. }
  349. func cloneMap(src map[string]interface{}) map[string]interface{} {
  350. dst := make(map[string]interface{}, len(src))
  351. for k, v := range src {
  352. dst[k] = v
  353. }
  354. return dst
  355. }
  356. func dealProposed22() {
  357. url := GF.Es.URL
  358. //url := "http://127.0.0.1:19908"
  359. username := GF.Es.Username
  360. password := GF.Es.Password
  361. //index := "bidding" //索引名称
  362. // 创建 Elasticsearch 客户端
  363. client, err := elastic.NewClient(
  364. elastic.SetURL(url),
  365. elastic.SetBasicAuth(username, password),
  366. elastic.SetSniff(false),
  367. )
  368. if err != nil {
  369. log.Info("创建 Elasticsearch 客户端失败", zap.Error(err))
  370. }
  371. //
  372. sess := MgoP.GetMgoConn()
  373. defer MgoP.DestoryMongoConn(sess)
  374. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  375. where := map[string]interface{}{
  376. "firsttime": map[string]interface{}{
  377. "$gte": 1735660800,
  378. },
  379. }
  380. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  381. count := 0
  382. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  383. if count%1000 == 0 {
  384. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  385. }
  386. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  387. project_name := util.ObjToString(tmp["projectname"])
  388. buyer := util.ObjToString(tmp["owner"])
  389. results, err := searchES22(client, project_name, buyer, 60, 10)
  390. if err != nil {
  391. log.Info("searchES22", zap.Error(err))
  392. }
  393. projectIds := make([]string, 0) //拟建对应的在建项目ID
  394. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  395. biddings := make([]map[string]interface{}, 0)
  396. for _, re := range results {
  397. bidding_id := util.ObjToString(re["id"])
  398. biddingIds = append(biddingIds, bidding_id)
  399. bidding := map[string]interface{}{
  400. "id": re["id"],
  401. "title": re["title"],
  402. "projectname": re["projectname"],
  403. "score": re["score"],
  404. "toptype": re["toptype"],
  405. "subtype": re["subtype"],
  406. }
  407. biddings = append(biddings, bidding)
  408. }
  409. for _, bid := range biddingIds {
  410. where2 := map[string]interface{}{
  411. "ids": bid,
  412. }
  413. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  414. if projectset != nil && len((*projectset)) > 0 {
  415. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  416. }
  417. }
  418. insert := map[string]interface{}{
  419. "proposed_id": proposed_id,
  420. "bidding_ids": removeDuplicates(biddingIds),
  421. "project_ids": removeDuplicates(projectIds),
  422. "biddings": biddings,
  423. "project_name": tmp["projectname"],
  424. }
  425. MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22", insert)
  426. }
  427. }
  428. // dealProposed 处理拟建数据表
  429. func dealProposed() {
  430. sess := MgoP.GetMgoConn()
  431. defer MgoP.DestoryMongoConn(sess)
  432. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  433. where := map[string]interface{}{
  434. "firsttime": map[string]interface{}{
  435. "$gte": 1735660800,
  436. },
  437. }
  438. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  439. count := 0
  440. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  441. if count%1000 == 0 {
  442. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  443. }
  444. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  445. insert := make(map[string]interface{})
  446. insert["proposed_id"] = proposed_id
  447. var nzj_follw_records = make([]DwdFnzjFollowRecord, 0)
  448. err := JianyuSubjectDB.Where("proposed_id = ? ", proposed_id).Find(&nzj_follw_records).Error
  449. if err != nil {
  450. log.Info("dealProposed", zap.Error(err))
  451. }
  452. //拟建标讯,没有找到对应的在建项目数据
  453. if len(nzj_follw_records) == 0 {
  454. insert["has_bidding"] = false
  455. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  456. continue
  457. }
  458. projectIds := make([]string, 0) //拟建对应的在建项目ID
  459. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  460. for _, v := range nzj_follw_records {
  461. biddingIds = append(biddingIds, v.InfoID)
  462. }
  463. biddingIds = removeDuplicates(biddingIds)
  464. insert["bidding_ids"] = biddingIds
  465. for _, bid := range biddingIds {
  466. where2 := map[string]interface{}{
  467. "ids": bid,
  468. }
  469. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  470. if projectset != nil && len((*projectset)) > 0 {
  471. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  472. }
  473. }
  474. if len(projectIds) > 0 {
  475. insert["project_ids"] = projectIds
  476. } else {
  477. insert["has_project"] = false
  478. }
  479. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  480. }
  481. log.Info("dealProposed", zap.Any("数据处理完毕:拟建数据表", "projectset_proposed"))
  482. }
  483. // removeDuplicates 去除重复字符串
  484. func removeDuplicates(arr []string) []string {
  485. uniqueMap := make(map[string]bool)
  486. var result []string
  487. for _, str := range arr {
  488. if !uniqueMap[str] {
  489. uniqueMap[str] = true
  490. result = append(result, str)
  491. }
  492. }
  493. return result
  494. }
  495. // searchES24 添加分词查询
  496. func searchES24(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  497. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  498. filtersToTry := [][]elastic.Query{
  499. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  500. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  501. {elastic.NewTermsQuery("toptype", "拟建")},
  502. }
  503. var allResults []*elastic.SearchHit
  504. seenIDs := make(map[string]bool)
  505. // ✅ Step 1: 精准查询
  506. for _, field := range fieldsToTry {
  507. if field == "detail" && len(allResults) > 0 {
  508. break
  509. }
  510. for _, filter := range filtersToTry {
  511. query := elastic.NewBoolQuery().
  512. Must(elastic.NewMultiMatchQuery(projectName, field).Type("phrase")).
  513. Filter(filter...)
  514. fetchFields := elastic.NewFetchSourceContext(true).Include(
  515. "id", "title", "projectname", "projectcode", "bidamount", "score",
  516. "area", "city", "toptype", "subtype", "buyer", "budget", "buyerperson",
  517. "buyertel", "s_winner", "winnertel", "agency", "publishtime")
  518. searchResult, err := client.Search().
  519. Index("bidding").
  520. Query(query).
  521. Size(70).
  522. FetchSourceContext(fetchFields).
  523. Do(context.Background())
  524. if err != nil {
  525. return nil, err
  526. }
  527. for _, hit := range searchResult.Hits.Hits {
  528. if !seenIDs[hit.Id] {
  529. seenIDs[hit.Id] = true
  530. allResults = append(allResults, hit)
  531. }
  532. }
  533. if len(allResults) >= maxResults {
  534. break
  535. }
  536. }
  537. if len(allResults) >= maxResults {
  538. break
  539. }
  540. }
  541. // ✅ Step 2: 如果没结果,用分词兜底查询
  542. if len(allResults) == 0 {
  543. // 分词
  544. analyzeResp, err := client.IndexAnalyze().
  545. Index("bidding").
  546. Analyzer("ik_smart").
  547. Text(projectName).
  548. Do(context.Background())
  549. if err != nil {
  550. return nil, fmt.Errorf("ik_smart analyze failed: %v", err)
  551. }
  552. var tokens []string
  553. for _, token := range analyzeResp.Tokens {
  554. tokens = append(tokens, token.Token)
  555. }
  556. if len(tokens) == 0 {
  557. return nil, fmt.Errorf("no tokens found from ik_smart")
  558. }
  559. // 用所有分词一次性查询
  560. queryText := strings.Join(tokens, " ")
  561. for _, filter := range filtersToTry {
  562. query := elastic.NewBoolQuery().
  563. Must(
  564. elastic.NewMultiMatchQuery(queryText, fieldsToTry...).
  565. MinimumShouldMatch("100%"), // 必须包含所有分词,可根据需求改成 80%、50%
  566. ).
  567. Filter(filter...)
  568. searchResult, err := client.Search().
  569. Index("bidding").
  570. Query(query).
  571. Size(10). // 根据需要调整
  572. Do(context.Background())
  573. if err != nil {
  574. log.Warn("multi token query failed", zap.Error(err))
  575. continue
  576. }
  577. for _, hit := range searchResult.Hits.Hits {
  578. if !seenIDs[hit.Id] {
  579. seenIDs[hit.Id] = true
  580. allResults = append(allResults, hit)
  581. }
  582. }
  583. if len(allResults) >= maxResults {
  584. break
  585. }
  586. }
  587. }
  588. // ✅ Step 3: 后处理
  589. var results []map[string]interface{}
  590. seenProjectNames := make(map[string]bool)
  591. seenProjectCodes := make(map[string]bool)
  592. bidamountMap := make(map[float64]bool)
  593. for _, hit := range allResults {
  594. var doc map[string]interface{}
  595. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  596. log.Info("解析文档失败", zap.Error(err))
  597. continue
  598. }
  599. projectNameValue := util.ObjToString(doc["projectname"])
  600. if projectNameValue == "" {
  601. continue
  602. }
  603. projectCode := util.ObjToString(doc["projectcode"])
  604. if projectCode != "" {
  605. if seenProjectCodes[projectCode] {
  606. continue
  607. }
  608. seenProjectCodes[projectCode] = true
  609. }
  610. bidamount := util.Float64All(doc["bidamount"])
  611. if bidamount != 0 {
  612. if bidamountMap[bidamount] {
  613. continue
  614. }
  615. bidamountMap[bidamount] = true
  616. }
  617. score := *hit.Score
  618. if score < scoreThreshold {
  619. continue
  620. }
  621. id := util.ObjToString(doc["id"])
  622. bidd, _ := MgoB.FindById("bidding", id, nil)
  623. detail := util.ObjToString((*bidd)["detail"])
  624. if detail != "" && !strings.Contains(detail, projectName) {
  625. continue
  626. }
  627. if buyer2 != "" && !strings.Contains(detail, buyer2) {
  628. continue
  629. }
  630. if seenProjectNames[projectNameValue] {
  631. continue
  632. }
  633. seenProjectNames[projectNameValue] = true
  634. doc["detail"] = detail
  635. doc["score"] = score
  636. results = append(results, doc)
  637. if len(results) >= maxResults {
  638. break
  639. }
  640. }
  641. // ✅ Step 4: 排序
  642. sort.Slice(results, func(i, j int) bool {
  643. return util.Float64All(results[i]["score"]) > util.Float64All(results[j]["score"])
  644. })
  645. return results, nil
  646. }
  647. func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  648. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  649. filtersToTry := [][]elastic.Query{
  650. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  651. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  652. {elastic.NewTermsQuery("toptype", "拟建")},
  653. }
  654. var allResults []*elastic.SearchHit
  655. seenIDs := make(map[string]bool)
  656. for _, field := range fieldsToTry {
  657. if field == "detail" && len(allResults) > 0 {
  658. break
  659. }
  660. for _, filter := range filtersToTry {
  661. // 构建查询:使用 MultiMatchQuery + phrase
  662. query := elastic.NewBoolQuery().
  663. Must(elastic.NewMultiMatchQuery(projectName, field).Type("phrase")).
  664. Filter(filter...)
  665. fetchFields := elastic.NewFetchSourceContext(true).Include("id",
  666. "title", "projectname", "projectcode", "bidamount", "score", "area",
  667. "city", "toptype", "subtype", "buyer", "budget", "buyerperson", "buyertel",
  668. "s_winner", "winnertel", "agency", "publishtime")
  669. // 执行查询
  670. searchResult, err := client.Search().
  671. Index("bidding").
  672. Query(query).
  673. Size(70).
  674. FetchSourceContext(fetchFields). // 添加这一行,查询部分字段
  675. Do(context.Background())
  676. if err != nil {
  677. return nil, err
  678. }
  679. // 去重处理
  680. for _, hit := range searchResult.Hits.Hits {
  681. if !seenIDs[hit.Id] {
  682. seenIDs[hit.Id] = true
  683. allResults = append(allResults, hit)
  684. }
  685. }
  686. if len(allResults) >= maxResults {
  687. break
  688. }
  689. }
  690. if len(allResults) >= maxResults {
  691. break
  692. }
  693. }
  694. var results []map[string]interface{}
  695. seenProjectNames := make(map[string]bool)
  696. seenProjectCodes := make(map[string]bool)
  697. bidamountMap := make(map[float64]bool)
  698. //subtypeMap := make(map[string]bool)
  699. for _, hit := range allResults {
  700. var doc map[string]interface{}
  701. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  702. log.Info("解析文档失败", zap.Error(err))
  703. continue
  704. }
  705. projectNameValue := util.ObjToString(doc["projectname"])
  706. if projectNameValue == "" {
  707. continue
  708. }
  709. projectCode := util.ObjToString(doc["projectcode"])
  710. if projectCode != "" {
  711. if seenProjectCodes[projectCode] {
  712. continue
  713. }
  714. seenProjectCodes[projectCode] = true
  715. }
  716. bidamount := util.Float64All(doc["bidamount"])
  717. if bidamount != 0 {
  718. if bidamountMap[bidamount] {
  719. continue
  720. }
  721. bidamountMap[bidamount] = true
  722. }
  723. // 相似度筛选
  724. score := *hit.Score
  725. doc["score"] = score //相似度
  726. if score < scoreThreshold {
  727. continue
  728. }
  729. doc["score"] = score
  730. //detail := util.ObjToString(doc["detail"])
  731. id := util.ObjToString(doc["id"])
  732. bidd, _ := MgoB.FindById("bidding", id, nil)
  733. detail := util.ObjToString((*bidd)["detail"])
  734. // 字段中必须包含 projectName
  735. if detail != "" {
  736. if !strings.Contains(detail, projectName) {
  737. continue
  738. }
  739. }
  740. if buyer2 != "" {
  741. if !strings.Contains(detail, buyer2) {
  742. continue
  743. }
  744. }
  745. if seenProjectNames[projectNameValue] {
  746. continue
  747. }
  748. seenProjectNames[projectNameValue] = true
  749. doc["detail"] = detail
  750. results = append(results, doc)
  751. if len(results) >= maxResults {
  752. break
  753. }
  754. }
  755. // 排序:按 score 降序
  756. sort.Slice(results, func(i, j int) bool {
  757. si := util.Float64All(results[i]["score"])
  758. sj := util.Float64All(results[j]["score"])
  759. return si > sj
  760. })
  761. return results, nil
  762. }
  763. func searchES22(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  764. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  765. filtersToTry := [][]elastic.Query{
  766. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  767. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  768. {elastic.NewTermsQuery("toptype", "拟建")},
  769. }
  770. var allResults []*elastic.SearchHit
  771. seenIDs := make(map[string]bool)
  772. for _, field := range fieldsToTry {
  773. for _, filter := range filtersToTry {
  774. // 构建查询
  775. query := elastic.NewBoolQuery().
  776. Must(elastic.NewMatchQuery(field, projectName)).
  777. Filter(filter...)
  778. // 执行查询
  779. searchResult, err := client.Search().
  780. Index("bidding").
  781. Query(query).
  782. Size(70). // 多取一些,后面做筛选和去重
  783. Do(context.Background())
  784. if err != nil {
  785. return nil, err
  786. }
  787. for _, hit := range searchResult.Hits.Hits {
  788. if !seenIDs[hit.Id] {
  789. allResults = append(allResults, hit)
  790. seenIDs[hit.Id] = true
  791. }
  792. }
  793. if len(allResults) >= maxResults {
  794. break
  795. }
  796. }
  797. if len(allResults) >= maxResults {
  798. break
  799. }
  800. }
  801. var results []map[string]interface{}
  802. seenProjectNames := make(map[string]bool)
  803. seenProjectCodes := make(map[string]bool)
  804. bidamountMap := make(map[float64]bool)
  805. for _, hit := range allResults {
  806. var doc map[string]interface{}
  807. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  808. log.Info("解析文档失败", zap.Error(err))
  809. continue
  810. }
  811. projectNameValue := util.ObjToString(doc["projectname"])
  812. if projectNameValue == "" {
  813. continue
  814. }
  815. projectCode := util.ObjToString(doc["projectcode"])
  816. if seenProjectCodes[projectCode] {
  817. continue
  818. }
  819. seenProjectCodes[projectCode] = true
  820. bidamount := util.Float64All(doc["bidamount"])
  821. if bidamountMap[bidamount] {
  822. continue
  823. }
  824. bidamountMap[bidamount] = true
  825. // 相似度筛选
  826. score := *hit.Score
  827. doc["score"] = score //相似度
  828. if score < scoreThreshold {
  829. continue
  830. }
  831. //id := util.ObjToString(doc["id"])
  832. //doc["jyhref"] = GetJyURLByID(id)
  833. //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" {
  834. // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  835. //}
  836. // enrich: total_investment
  837. //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil {
  838. // if util.Float64All((*bidData)["total_investment"]) > 0 {
  839. // doc["total_investment"] = (*bidData)["total_investment"]
  840. // }
  841. //}
  842. doc["score"] = score
  843. detail := util.ObjToString(doc["detail"])
  844. // 字段中必须包含 projectName
  845. if buyer2 != "" {
  846. if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) {
  847. continue
  848. }
  849. }
  850. if seenProjectNames[projectNameValue] {
  851. continue
  852. }
  853. seenProjectNames[projectNameValue] = true
  854. results = append(results, doc)
  855. if len(results) >= maxResults {
  856. break
  857. }
  858. }
  859. return results, nil
  860. }
  861. func searchES(client *elastic.Client, projectName, buyer2 string) ([]map[string]interface{}, error) {
  862. query := elastic.NewBoolQuery().
  863. Must(
  864. //elastic.NewMatchQuery("projectname.pname", projectName), // 模糊匹配 projectname
  865. //elastic.NewMatchQuery("title", projectName), // 模糊匹配 projectname
  866. elastic.NewMatchQuery("detail", projectName), // 模糊匹配 projectname
  867. //elastic.NewTermQuery("area", "安徽"), // 过滤区域
  868. elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一"), // 过滤 subtype
  869. //elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向"), // 过滤 subtype
  870. //elastic.NewTermsQuery("toptype", "拟建"), // 过滤 subtype
  871. )
  872. searchResult, err := client.Search().
  873. Index("bidding").
  874. Query(query).
  875. Size(70). // 先取 12 条,确保足够数据
  876. Do(context.Background())
  877. if err != nil {
  878. return nil, err
  879. }
  880. // 结果集
  881. var results []map[string]interface{}
  882. seenProjectNames := make(map[string]bool) // 用于去重
  883. seenProjectCode := make(map[string]bool) // 用于去重
  884. bidamountMap := make(map[float64]bool)
  885. for _, hit := range searchResult.Hits.Hits {
  886. var doc map[string]interface{}
  887. err := json.Unmarshal(hit.Source, &doc)
  888. if err != nil {
  889. log.Info("解析文档失败", zap.Error(err))
  890. continue
  891. }
  892. // 获取 `projectname`,防止 key 不存在时的错误
  893. projectNameValue, ok := doc["projectname"].(string)
  894. bidamount := util.Float64All(doc["bidamount"])
  895. if !ok {
  896. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  897. continue
  898. }
  899. projectCodeValue := util.ObjToString(doc["projectcode"])
  900. if seenProjectCode[projectCodeValue] {
  901. continue
  902. }
  903. if projectCodeValue != "" {
  904. seenProjectCode[projectCodeValue] = true
  905. }
  906. // **处理额外字段**
  907. id := util.ObjToString(doc["id"])
  908. bidData, _ := MgoB.FindById("bidding", id, nil)
  909. if util.Float64All((*bidData)["total_investment"]) > 0 {
  910. doc["total_investment"] = (*bidData)["total_investment"]
  911. }
  912. doc["jyhref"] = GetJyURLByID(id)
  913. score := *hit.Score
  914. site := util.ObjToString(doc["site"])
  915. if site == "中华人民共和国自然资源部" {
  916. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  917. }
  918. doc["score"] = score //相似度
  919. detail := util.ObjToString(doc["detail"])
  920. if !strings.Contains(detail, projectName) {
  921. continue
  922. }
  923. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  924. if seenProjectNames[projectNameValue] {
  925. continue
  926. }
  927. if bidamountMap[bidamount] {
  928. continue
  929. }
  930. // **记录该 `projectname`,避免重复**
  931. seenProjectNames[projectNameValue] = true
  932. bidamountMap[bidamount] = true
  933. // **加入结果集**
  934. results = append(results, doc)
  935. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  936. if len(results) >= 10 {
  937. break
  938. }
  939. }
  940. //2、判断正文包含采购单位
  941. for _, hit := range searchResult.Hits.Hits {
  942. var doc map[string]interface{}
  943. err := json.Unmarshal(hit.Source, &doc)
  944. if err != nil {
  945. log.Info("解析文档失败:", zap.Error(err))
  946. continue
  947. }
  948. // 获取 `projectname`,防止 key 不存在时的错误
  949. projectNameValue, ok := doc["projectname"].(string)
  950. bidamount := util.Float64All(doc["bidamount"])
  951. if !ok {
  952. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  953. continue
  954. }
  955. // **处理额外字段**
  956. id := util.ObjToString(doc["id"])
  957. doc["jyhref"] = GetJyURLByID(id)
  958. score := *hit.Score
  959. doc["score"] = score //相似度
  960. site := util.ObjToString(doc["site"])
  961. if site == "中华人民共和国自然资源部" {
  962. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  963. }
  964. //判断正文包含采购单位
  965. detail := util.ObjToString(doc["detail"])
  966. if !strings.Contains(detail, buyer2) {
  967. continue
  968. }
  969. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  970. if seenProjectNames[projectNameValue] {
  971. continue
  972. }
  973. if bidamountMap[bidamount] {
  974. continue
  975. }
  976. // **记录该 `projectname`,避免重复**
  977. seenProjectNames[projectNameValue] = true
  978. bidamountMap[bidamount] = true
  979. // **加入结果集**
  980. results = append(results, doc)
  981. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  982. if len(results) >= 10 {
  983. break
  984. }
  985. }
  986. return results, nil
  987. }
  988. // GetJyURLByID 获取剑鱼地址
  989. func GetJyURLByID(id string) string {
  990. var Url = "https://www.jianyu360.com/article/content/%s.html"
  991. url := fmt.Sprintf(Url, util.CommonEncodeArticle("content", id))
  992. return url
  993. }
  994. // GetIdByURL 解密url,获取bidding ID
  995. func GetIdByURL(url string) string {
  996. if strings.Contains(url, "work-bench") {
  997. return ""
  998. }
  999. if strings.Contains(url, "/article/content") {
  1000. urls := strings.Split(url, "content/")
  1001. res := strings.Split(urls[1], ".html")
  1002. ids := util.CommonDecodeArticle("content", res[0])
  1003. return ids[0]
  1004. }
  1005. if strings.HasSuffix(url, "appid") {
  1006. urls := strings.Split(url, "entservice/")
  1007. res := strings.Split(urls[1], ".html")
  1008. se := util.SimpleEncrypt{Key: "entservice"}
  1009. id := se.DecodeString(res[0])
  1010. return id
  1011. }
  1012. return ""
  1013. }
  1014. // isValidCodeFormat 判断 拟建项目编码
  1015. func isValidCodeFormat(s string) bool {
  1016. pattern := `^\d{4}-\d{6}-\d{2}-\d{2}-\d{6}$`
  1017. matched, err := regexp.MatchString(pattern, s)
  1018. if err != nil {
  1019. return false
  1020. }
  1021. return matched
  1022. }