project.go 36 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/base64"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/olivere/elastic/v7"
  8. "go.uber.org/zap"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/log"
  11. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  12. "regexp"
  13. "sort"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. func dealXlsxTest() {
  19. // 1. 初始化 ES 客户端
  20. client, err := elastic.NewClient(
  21. elastic.SetURL(GF.Es.URL),
  22. elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
  23. elastic.SetSniff(false),
  24. )
  25. if err != nil {
  26. log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
  27. }
  28. // 2. 初始化 MongoDB 连接
  29. sess := MgoP.GetMgoConn()
  30. defer MgoP.DestoryMongoConn(sess)
  31. coll := sess.DB("qfw").C("wcc_dealXlsxData_0524")
  32. iter := coll.Find(nil).Select(nil).Iter()
  33. // 3. 并发控制
  34. const maxWorkers = 2
  35. taskCh := make(chan map[string]interface{}, 2000)
  36. var wg sync.WaitGroup
  37. // 4. 启动 worker 处理任务
  38. for i := 0; i < maxWorkers; i++ {
  39. wg.Add(1)
  40. go func() {
  41. defer wg.Done()
  42. for doc := range taskCh {
  43. if len(doc) == 0 {
  44. log.Info("aaa", zap.Any("client", client))
  45. }
  46. processOneProposedTest(doc, client)
  47. }
  48. }()
  49. }
  50. // 5. 逐条读取数据并派发任务
  51. log.Info("111111", zap.String("222222", "开始处理数据"))
  52. count := 0
  53. for doc := make(map[string]interface{}); iter.Next(doc); {
  54. count++
  55. if count%1000 == 0 {
  56. log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["p1_project_name"]))
  57. }
  58. taskCh <- cloneMap(doc) // 防止 map 重用
  59. }
  60. close(taskCh)
  61. wg.Wait()
  62. }
  63. func processOneProposedTest(tmp map[string]interface{}, client *elastic.Client) {
  64. defer func() {
  65. if r := recover(); r != nil {
  66. log.Warn("panic in processOneProposed", zap.Any("recover", r))
  67. }
  68. }()
  69. id := mongodb.BsonIdToSId(tmp["_id"])
  70. //proposedID := mongodb.BsonIdToSId(tmp["_id"])
  71. projectName := util.ObjToString(tmp["p1_project_name"])
  72. buyer := util.ObjToString(tmp["p1_project_owner"])
  73. //proposed_number := util.ObjToString(tmp["proposed_number"])
  74. //log.Info("processOneProposed", zap.String("开始查询es", projectName))
  75. results, err := searchES23(client, projectName, buyer, 20, 50)
  76. if err != nil {
  77. log.Warn("searchES22 error", zap.Error(err))
  78. return
  79. }
  80. //log.Info("processOneProposed", zap.String("结束查询es", projectName))
  81. biddings := []map[string]interface{}{}
  82. update := map[string]interface{}{}
  83. // 标讯信息
  84. for _, re := range results {
  85. biddingID := util.ObjToString(re["id"])
  86. da := map[string]interface{}{
  87. "id": re["id"],
  88. "title": re["title"],
  89. "area": re["area"],
  90. "city": re["city"],
  91. "projectname": re["projectname"],
  92. "score": re["score"],
  93. "toptype": re["toptype"],
  94. "subtype": re["subtype"],
  95. "buyer": re["buyer"],
  96. "budget": re["budget"],
  97. "buyerperson": re["buyerperson"],
  98. "buyertel": re["buyertel"],
  99. "s_winner": re["s_winner"],
  100. "bidamount": re["bidamount"],
  101. "winnertel": re["winnertel"],
  102. "agency": re["agency"],
  103. "publishtime": re["publishtime"],
  104. }
  105. //项目信息
  106. where2 := map[string]interface{}{"ids": biddingID}
  107. if util.ObjToString(re["toptype"]) == "拟建" {
  108. projectset, _ := MgoP.FindOne("projectset_proposed", where2)
  109. if projectset != nil && len((*projectset)) > 0 {
  110. v3 := map[string]interface{}{
  111. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  112. "projectname": (*projectset)["projectname"],
  113. "bidamount": (*projectset)["bidamount"],
  114. "area": (*projectset)["area"],
  115. "city": (*projectset)["city"],
  116. "district": (*projectset)["district"],
  117. "owner": (*projectset)["owner"],
  118. "approvecode": (*projectset)["approvecode"],
  119. }
  120. if (*projectset)["owner"] != "" {
  121. where11 := map[string]interface{}{
  122. "company_name": (*projectset)["owner"],
  123. }
  124. std, _ := MgoQY.FindOne("qyxy_std", where11)
  125. v3["credit_no"] = (*std)["credit_no"]
  126. }
  127. da["project"] = v3
  128. }
  129. biddings = append(biddings, da)
  130. } else {
  131. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  132. if projectset != nil && len((*projectset)) > 0 {
  133. v3 := map[string]interface{}{
  134. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  135. "projectname": (*projectset)["projectname"],
  136. "bidamount": (*projectset)["bidamount"],
  137. "area": (*projectset)["area"],
  138. "city": (*projectset)["city"],
  139. "district": (*projectset)["district"],
  140. "firsttime": (*projectset)["firsttime"],
  141. "bidtype": (*projectset)["bidtype"],
  142. "bidstatus": (*projectset)["bidstatus"],
  143. "sortprice": (*projectset)["sortprice"],
  144. "buyer": (*projectset)["buyer"],
  145. }
  146. if (*projectset)["buyer"] != "" {
  147. where11 := map[string]interface{}{
  148. "company_name": (*projectset)["buyer"],
  149. }
  150. std, _ := MgoQY.FindOne("qyxy_std", where11)
  151. v3["credit_no"] = (*std)["credit_no"]
  152. }
  153. da["project"] = v3
  154. }
  155. biddings = append(biddings, da)
  156. }
  157. }
  158. if len(biddings) > 0 {
  159. update["bidding"] = biddings
  160. MgoP.UpdateById("wcc_dealXlsxData_0524", id, map[string]interface{}{"$set": update})
  161. }
  162. }
  163. // dealProposed22Concurrent 多协程处理,拟建存量数据
  164. func dealProposed22Concurrent() {
  165. // 1. 初始化 ES 客户端
  166. client, err := elastic.NewClient(
  167. elastic.SetURL(GF.Es.URL),
  168. elastic.SetBasicAuth(GF.Es.Username, GF.Es.Password),
  169. elastic.SetSniff(false),
  170. )
  171. if err != nil {
  172. log.Fatal("创建 Elasticsearch 客户端失败", zap.Error(err))
  173. }
  174. // 2. 初始化 MongoDB 连接
  175. sess := MgoP.GetMgoConn()
  176. defer MgoP.DestoryMongoConn(sess)
  177. coll := sess.DB("qfw").C("projectset_proposed")
  178. query := map[string]interface{}{
  179. //"firsttime": map[string]interface{}{
  180. // "$gte": 1735660800,
  181. // "$lte": 1748102400,
  182. //},
  183. //"firsttime": map[string]interface{}{
  184. // "$lte": 1735660800,
  185. //},
  186. "_id": map[string]interface{}{
  187. "$lte": mongodb.StringTOBsonId("62b6fbc9fa39106bd5e599fc"),
  188. },
  189. }
  190. iter := coll.Find(query).Select(nil).Sort("-_id").Iter()
  191. // 3. 并发控制
  192. const maxWorkers = 1
  193. taskCh := make(chan map[string]interface{}, 2000)
  194. var wg sync.WaitGroup
  195. // 4. 启动 worker 处理任务
  196. for i := 0; i < maxWorkers; i++ {
  197. wg.Add(1)
  198. go func() {
  199. defer wg.Done()
  200. for doc := range taskCh {
  201. if len(doc) == 0 {
  202. log.Info("aaa", zap.Any("client", client))
  203. }
  204. processOneProposed(doc, client)
  205. }
  206. }()
  207. }
  208. // 5. 逐条读取数据并派发任务
  209. log.Info("111111", zap.String("222222", "开始处理数据"))
  210. count := 0
  211. for doc := make(map[string]interface{}); iter.Next(doc); {
  212. count++
  213. if count%1000 == 0 {
  214. log.Info("dealProposed", zap.Int("current", count), zap.Any("projectname", doc["projectname"]), zap.Any("_id", doc["_id"]))
  215. }
  216. //if util.ObjToString(doc["area"]) == "甘肃" {
  217. // continue
  218. //}
  219. taskCh <- cloneMap(doc) // 防止 map 重用
  220. }
  221. close(taskCh)
  222. wg.Wait()
  223. }
  224. // processOneProposed 处理存量数据
  225. func processOneProposed(tmp map[string]interface{}, client *elastic.Client) {
  226. defer func() {
  227. if r := recover(); r != nil {
  228. log.Warn("panic in processOneProposed", zap.Any("recover", r))
  229. }
  230. }()
  231. proposedID := mongodb.BsonIdToSId(tmp["_id"])
  232. projectName := util.ObjToString(tmp["projectname"])
  233. buyer := util.ObjToString(tmp["owner"])
  234. proposed_number := util.ObjToString(tmp["proposed_number"])
  235. //log.Info("processOneProposed", zap.String("开始查询es", projectName))
  236. results, err := searchES23(client, projectName, buyer, 20, 50)
  237. if err != nil {
  238. log.Warn("searchES22 error", zap.Error(err))
  239. return
  240. }
  241. //log.Info("processOneProposed", zap.String("结束查询es", projectName))
  242. biddings := []map[string]interface{}{}
  243. // 标讯信息
  244. for _, re := range results {
  245. biddingID := util.ObjToString(re["id"])
  246. da := map[string]interface{}{
  247. "id": re["id"],
  248. "title": re["title"],
  249. "area": re["area"],
  250. "city": re["city"],
  251. "projectname": re["projectname"],
  252. "score": re["score"],
  253. "toptype": re["toptype"],
  254. "subtype": re["subtype"],
  255. "buyer": re["buyer"],
  256. "budget": re["budget"],
  257. "buyerperson": re["buyerperson"],
  258. "buyertel": re["buyertel"],
  259. "s_winner": re["s_winner"],
  260. "bidamount": re["bidamount"],
  261. "winnertel": re["winnertel"],
  262. "agency": re["agency"],
  263. "publishtime": re["publishtime"],
  264. }
  265. //项目信息
  266. where2 := map[string]interface{}{"ids": biddingID}
  267. if util.ObjToString(re["toptype"]) == "拟建" {
  268. projectset, _ := MgoP.FindOne("projectset_proposed", where2)
  269. if projectset != nil && len((*projectset)) > 0 {
  270. v3 := map[string]interface{}{
  271. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  272. "projectname": (*projectset)["projectname"],
  273. "bidamount": (*projectset)["bidamount"],
  274. "area": (*projectset)["area"],
  275. "city": (*projectset)["city"],
  276. "district": (*projectset)["district"],
  277. "owner": (*projectset)["owner"],
  278. "approvecode": (*projectset)["approvecode"],
  279. "approvestatus": (*projectset)["approvestatus"],
  280. "sourceinfourl": (*projectset)["sourceinfourl"],
  281. }
  282. if (*projectset)["owner"] != "" {
  283. where11 := map[string]interface{}{
  284. "company_name": (*projectset)["owner"],
  285. }
  286. std, _ := MgoQY.FindOne("qyxy_std", where11)
  287. v3["credit_no"] = (*std)["credit_no"]
  288. }
  289. da["project"] = v3
  290. }
  291. biddings = append(biddings, da)
  292. } else {
  293. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  294. if projectset != nil && len((*projectset)) > 0 {
  295. v3 := map[string]interface{}{
  296. "project_id": mongodb.BsonIdToSId((*projectset)["_id"]),
  297. "projectname": (*projectset)["projectname"],
  298. "bidamount": (*projectset)["bidamount"],
  299. "area": (*projectset)["area"],
  300. "city": (*projectset)["city"],
  301. "firsttime": (*projectset)["firsttime"],
  302. "bidtype": (*projectset)["bidtype"],
  303. "bidstatus": (*projectset)["bidstatus"],
  304. "sortprice": (*projectset)["sortprice"],
  305. "buyer": (*projectset)["buyer"],
  306. }
  307. da["project"] = v3
  308. }
  309. biddings = append(biddings, da)
  310. }
  311. }
  312. insert := map[string]interface{}{
  313. "proposed_id": proposedID,
  314. "stype": 1, //代表从拟建数据-> 匹配在建数据
  315. "proposed_number": proposed_number,
  316. "buyer": buyer,
  317. "projectname": tmp["projectname"],
  318. "area": tmp["area"],
  319. "city": tmp["city"],
  320. "district": tmp["district"],
  321. "bidding": biddings,
  322. "updatetime": time.Now().Unix(),
  323. }
  324. if isValidCodeFormat(util.ObjToString(tmp["approvecode"])) {
  325. insert["approvecode"] = tmp["approvecode"]
  326. }
  327. if buyer != "" {
  328. where11 := map[string]interface{}{
  329. "company_name": buyer,
  330. }
  331. std, _ := MgoQY.FindOne("qyxy_std", where11)
  332. insert["credit_no"] = (*std)["credit_no"]
  333. }
  334. whereExist := map[string]interface{}{
  335. "proposed_id": proposedID,
  336. }
  337. if GF.Env.Savecoll == "" {
  338. GF.Env.Savecoll = "wcc_nj_zj_bidding"
  339. }
  340. exist, _ := MgoP.FindOne(GF.Env.Savecoll, whereExist)
  341. // 存在就更新
  342. if exist != nil && len(*exist) > 0 {
  343. exitsid := mongodb.BsonIdToSId((*exist)["_id"])
  344. MgoP.UpdateById(GF.Env.Savecoll, exitsid, map[string]interface{}{"$set": insert})
  345. } else {
  346. insert["comeintime"] = time.Now().Unix()
  347. MgoP.InsertOrUpdate("qfw", GF.Env.Savecoll, insert)
  348. }
  349. }
  350. func cloneMap(src map[string]interface{}) map[string]interface{} {
  351. dst := make(map[string]interface{}, len(src))
  352. for k, v := range src {
  353. dst[k] = v
  354. }
  355. return dst
  356. }
  357. func dealProposed22() {
  358. url := GF.Es.URL
  359. //url := "http://127.0.0.1:19908"
  360. username := GF.Es.Username
  361. password := GF.Es.Password
  362. //index := "bidding" //索引名称
  363. // 创建 Elasticsearch 客户端
  364. client, err := elastic.NewClient(
  365. elastic.SetURL(url),
  366. elastic.SetBasicAuth(username, password),
  367. elastic.SetSniff(false),
  368. )
  369. if err != nil {
  370. log.Info("创建 Elasticsearch 客户端失败", zap.Error(err))
  371. }
  372. //
  373. sess := MgoP.GetMgoConn()
  374. defer MgoP.DestoryMongoConn(sess)
  375. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  376. where := map[string]interface{}{
  377. "firsttime": map[string]interface{}{
  378. "$gte": 1735660800,
  379. },
  380. }
  381. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  382. count := 0
  383. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  384. if count%1000 == 0 {
  385. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  386. }
  387. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  388. project_name := util.ObjToString(tmp["projectname"])
  389. buyer := util.ObjToString(tmp["owner"])
  390. results, err := searchES22(client, project_name, buyer, 60, 10)
  391. if err != nil {
  392. log.Info("searchES22", zap.Error(err))
  393. }
  394. projectIds := make([]string, 0) //拟建对应的在建项目ID
  395. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  396. biddings := make([]map[string]interface{}, 0)
  397. for _, re := range results {
  398. bidding_id := util.ObjToString(re["id"])
  399. biddingIds = append(biddingIds, bidding_id)
  400. bidding := map[string]interface{}{
  401. "id": re["id"],
  402. "title": re["title"],
  403. "projectname": re["projectname"],
  404. "score": re["score"],
  405. "toptype": re["toptype"],
  406. "subtype": re["subtype"],
  407. }
  408. biddings = append(biddings, bidding)
  409. }
  410. for _, bid := range biddingIds {
  411. where2 := map[string]interface{}{
  412. "ids": bid,
  413. }
  414. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  415. if projectset != nil && len((*projectset)) > 0 {
  416. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  417. }
  418. }
  419. insert := map[string]interface{}{
  420. "proposed_id": proposed_id,
  421. "bidding_ids": removeDuplicates(biddingIds),
  422. "project_ids": removeDuplicates(projectIds),
  423. "biddings": biddings,
  424. "project_name": tmp["projectname"],
  425. }
  426. MgoP.InsertOrUpdate("qfw", "wcc_dealProposed22", insert)
  427. }
  428. }
  429. // dealProposed 处理拟建数据表
  430. func dealProposed() {
  431. sess := MgoP.GetMgoConn()
  432. defer MgoP.DestoryMongoConn(sess)
  433. log.Info("dealProposed", zap.Any("开始处理:拟建数据表", "projectset_proposed"))
  434. where := map[string]interface{}{
  435. "firsttime": map[string]interface{}{
  436. "$gte": 1735660800,
  437. },
  438. }
  439. queryMgo := sess.DB("qfw").C("projectset_proposed").Find(&where).Select(nil).Iter()
  440. count := 0
  441. for tmp := make(map[string]interface{}); queryMgo.Next(tmp); count++ {
  442. if count%1000 == 0 {
  443. log.Info("dealProposed", zap.Any("current", count), zap.Any("projectname", tmp["projectname"]))
  444. }
  445. proposed_id := mongodb.BsonIdToSId(tmp["_id"])
  446. insert := make(map[string]interface{})
  447. insert["proposed_id"] = proposed_id
  448. var nzj_follw_records = make([]DwdFnzjFollowRecord, 0)
  449. err := JianyuSubjectDB.Where("proposed_id = ? ", proposed_id).Find(&nzj_follw_records).Error
  450. if err != nil {
  451. log.Info("dealProposed", zap.Error(err))
  452. }
  453. //拟建标讯,没有找到对应的在建项目数据
  454. if len(nzj_follw_records) == 0 {
  455. insert["has_bidding"] = false
  456. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  457. continue
  458. }
  459. projectIds := make([]string, 0) //拟建对应的在建项目ID
  460. biddingIds := make([]string, 0) //拟建项目对在建项目中的标讯ids
  461. for _, v := range nzj_follw_records {
  462. biddingIds = append(biddingIds, v.InfoID)
  463. }
  464. biddingIds = removeDuplicates(biddingIds)
  465. insert["bidding_ids"] = biddingIds
  466. for _, bid := range biddingIds {
  467. where2 := map[string]interface{}{
  468. "ids": bid,
  469. }
  470. projectset, _ := MgoP.FindOne("projectset_20230904", where2)
  471. if projectset != nil && len((*projectset)) > 0 {
  472. projectIds = append(projectIds, mongodb.BsonIdToSId((*projectset)["_id"]))
  473. }
  474. }
  475. if len(projectIds) > 0 {
  476. insert["project_ids"] = projectIds
  477. } else {
  478. insert["has_project"] = false
  479. }
  480. MgoP.InsertOrUpdate("qfw", "wcc_ok_project_proposed", insert)
  481. }
  482. log.Info("dealProposed", zap.Any("数据处理完毕:拟建数据表", "projectset_proposed"))
  483. }
  484. // removeDuplicates 去除重复字符串
  485. func removeDuplicates(arr []string) []string {
  486. uniqueMap := make(map[string]bool)
  487. var result []string
  488. for _, str := range arr {
  489. if !uniqueMap[str] {
  490. uniqueMap[str] = true
  491. result = append(result, str)
  492. }
  493. }
  494. return result
  495. }
  496. func printInterfaceAsJSON(v interface{}) string {
  497. data, err := json.MarshalIndent(v, "", " ")
  498. if err != nil {
  499. fmt.Println("JSON 序列化失败:", err)
  500. return ""
  501. }
  502. return string(data)
  503. //fmt.Println("JSON 格式输出:\n", string(data))
  504. }
  505. // searchESCommonQuery common 方式查询语句
  506. func searchESCommonQuery(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  507. // 设置多个字段的 common 查询
  508. queryMap := map[string]interface{}{
  509. "bool": map[string]interface{}{
  510. "should": []interface{}{
  511. map[string]interface{}{
  512. "common": map[string]interface{}{
  513. "projectname.pname": map[string]interface{}{
  514. "query": projectName,
  515. "cutoff_frequency": 0.01,
  516. "low_freq_operator": "and",
  517. "boost": 0.2,
  518. },
  519. },
  520. },
  521. map[string]interface{}{
  522. "common": map[string]interface{}{
  523. "title": map[string]interface{}{
  524. "query": projectName,
  525. "cutoff_frequency": 0.01,
  526. "low_freq_operator": "and",
  527. "boost": 0.2,
  528. },
  529. },
  530. },
  531. map[string]interface{}{
  532. "common": map[string]interface{}{
  533. "detail": map[string]interface{}{
  534. "query": projectName,
  535. "cutoff_frequency": 0.01,
  536. "low_freq_operator": "and",
  537. "boost": 0.1,
  538. },
  539. },
  540. },
  541. },
  542. "minimum_should_match": 1,
  543. },
  544. }
  545. queryBytes, err := json.Marshal(queryMap)
  546. if err != nil {
  547. log.Error("marshal failed", zap.Error(err))
  548. return nil, err
  549. }
  550. // ✅ 正确用法:Base64 编码
  551. queryBase64 := base64.StdEncoding.EncodeToString(queryBytes)
  552. // ✅ 构建 WrapperQuery
  553. query := elastic.NewWrapperQuery(queryBase64)
  554. // 指定需要返回的字段
  555. fetchFields := elastic.NewFetchSourceContext(true).Include(
  556. "id", "title", "projectname", "projectcode", "bidamount", "score",
  557. "area", "city", "toptype", "subtype", "buyer", "budget", "buyerperson",
  558. "buyertel", "s_winner", "winnertel", "agency", "publishtime",
  559. )
  560. // 执行查询
  561. searchResult, err := client.Search().
  562. Index("bidding").
  563. Query(query).
  564. Size(maxResults).
  565. FetchSourceContext(fetchFields).
  566. Do(context.Background())
  567. if err != nil {
  568. return nil, fmt.Errorf("search failed: %v", err)
  569. }
  570. // 处理结果
  571. var results []map[string]interface{}
  572. seenProjectNames := make(map[string]bool)
  573. seenProjectCodes := make(map[string]bool)
  574. bidamountMap := make(map[float64]bool)
  575. for _, hit := range searchResult.Hits.Hits {
  576. var doc map[string]interface{}
  577. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  578. log.Info("解析文档失败", zap.Error(err))
  579. continue
  580. }
  581. projectNameValue := util.ObjToString(doc["projectname"])
  582. if projectNameValue == "" || seenProjectNames[projectNameValue] {
  583. continue
  584. }
  585. seenProjectNames[projectNameValue] = true
  586. projectCode := util.ObjToString(doc["projectcode"])
  587. if projectCode != "" {
  588. if seenProjectCodes[projectCode] {
  589. continue
  590. }
  591. seenProjectCodes[projectCode] = true
  592. }
  593. bidamount := util.Float64All(doc["bidamount"])
  594. if bidamount != 0 {
  595. if bidamountMap[bidamount] {
  596. continue
  597. }
  598. bidamountMap[bidamount] = true
  599. }
  600. id := util.ObjToString(doc["id"])
  601. bidd, _ := MgoB.FindById("bidding", id, nil)
  602. detail := util.ObjToString((*bidd)["detail"])
  603. if buyer2 != "" && !strings.Contains(detail, buyer2) {
  604. continue
  605. }
  606. doc["detail"] = detail
  607. if hit.Score != nil {
  608. doc["score"] = *hit.Score
  609. }
  610. results = append(results, doc)
  611. if len(results) >= maxResults {
  612. break
  613. }
  614. }
  615. return results, nil
  616. }
  617. // searchES24 添加分词查询
  618. func searchES24(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  619. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  620. filtersToTry := [][]elastic.Query{
  621. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  622. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  623. {elastic.NewTermsQuery("toptype", "拟建")},
  624. }
  625. var allResults []*elastic.SearchHit
  626. seenIDs := make(map[string]bool)
  627. // ✅ Step 1: 精准查询
  628. for _, field := range fieldsToTry {
  629. if field == "detail" && len(allResults) > 0 {
  630. break
  631. }
  632. for _, filter := range filtersToTry {
  633. query := elastic.NewBoolQuery().
  634. Must(elastic.NewMultiMatchQuery(projectName, field).Type("phrase")).
  635. Filter(filter...)
  636. fetchFields := elastic.NewFetchSourceContext(true).Include(
  637. "id", "title", "projectname", "projectcode", "bidamount", "score",
  638. "area", "city", "toptype", "subtype", "buyer", "budget", "buyerperson",
  639. "buyertel", "s_winner", "winnertel", "agency", "publishtime")
  640. searchResult, err := client.Search().
  641. Index("bidding").
  642. Query(query).
  643. Size(70).
  644. FetchSourceContext(fetchFields).
  645. Do(context.Background())
  646. if err != nil {
  647. return nil, err
  648. }
  649. // 打印 query JSON(调试用)
  650. if len(searchResult.Hits.Hits) > 0 {
  651. if sourceQ, err := query.Source(); err == nil {
  652. fmt.Println(printInterfaceAsJSON(sourceQ))
  653. }
  654. }
  655. for _, hit := range searchResult.Hits.Hits {
  656. if !seenIDs[hit.Id] {
  657. seenIDs[hit.Id] = true
  658. allResults = append(allResults, hit)
  659. }
  660. }
  661. if len(allResults) >= maxResults {
  662. break
  663. }
  664. }
  665. if len(allResults) >= maxResults {
  666. break
  667. }
  668. }
  669. // ✅ Step 2: 如果没结果,用分词兜底查询
  670. if len(allResults) == 0 {
  671. // 分词
  672. analyzeResp, err := client.IndexAnalyze().
  673. Index("bidding").
  674. Analyzer("ik_smart").
  675. Text(projectName).
  676. Do(context.Background())
  677. if err != nil {
  678. return nil, fmt.Errorf("ik_smart analyze failed: %v", err)
  679. }
  680. var tokens []string
  681. for _, token := range analyzeResp.Tokens {
  682. tokens = append(tokens, token.Token)
  683. }
  684. if len(tokens) == 0 {
  685. return nil, fmt.Errorf("no tokens found from ik_smart")
  686. }
  687. // 用所有分词一次性查询
  688. queryText := strings.Join(tokens, " ")
  689. for _, filter := range filtersToTry {
  690. query := elastic.NewBoolQuery().
  691. Must(
  692. elastic.NewMultiMatchQuery(queryText, fieldsToTry...).
  693. MinimumShouldMatch("100%"), // 必须包含所有分词,可根据需求改成 80%、50%
  694. ).
  695. Filter(filter...)
  696. searchResult, err := client.Search().
  697. Index("bidding").
  698. Query(query).
  699. Size(10). // 根据需要调整
  700. Do(context.Background())
  701. if err != nil {
  702. log.Warn("multi token query failed", zap.Error(err))
  703. continue
  704. }
  705. for _, hit := range searchResult.Hits.Hits {
  706. if !seenIDs[hit.Id] {
  707. seenIDs[hit.Id] = true
  708. allResults = append(allResults, hit)
  709. }
  710. }
  711. if len(allResults) >= maxResults {
  712. break
  713. }
  714. }
  715. }
  716. // ✅ Step 3: 后处理
  717. var results []map[string]interface{}
  718. seenProjectNames := make(map[string]bool)
  719. seenProjectCodes := make(map[string]bool)
  720. bidamountMap := make(map[float64]bool)
  721. for _, hit := range allResults {
  722. var doc map[string]interface{}
  723. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  724. log.Info("解析文档失败", zap.Error(err))
  725. continue
  726. }
  727. projectNameValue := util.ObjToString(doc["projectname"])
  728. if projectNameValue == "" {
  729. continue
  730. }
  731. projectCode := util.ObjToString(doc["projectcode"])
  732. if projectCode != "" {
  733. if seenProjectCodes[projectCode] {
  734. continue
  735. }
  736. seenProjectCodes[projectCode] = true
  737. }
  738. bidamount := util.Float64All(doc["bidamount"])
  739. if bidamount != 0 {
  740. if bidamountMap[bidamount] {
  741. continue
  742. }
  743. bidamountMap[bidamount] = true
  744. }
  745. score := *hit.Score
  746. if score < scoreThreshold {
  747. continue
  748. }
  749. id := util.ObjToString(doc["id"])
  750. bidd, _ := MgoB.FindById("bidding", id, nil)
  751. detail := util.ObjToString((*bidd)["detail"])
  752. //if detail != "" && !strings.Contains(detail, projectName) {
  753. // continue
  754. //}
  755. if buyer2 != "" && !strings.Contains(detail, buyer2) {
  756. continue
  757. }
  758. if seenProjectNames[projectNameValue] {
  759. continue
  760. }
  761. seenProjectNames[projectNameValue] = true
  762. doc["detail"] = detail
  763. doc["score"] = score
  764. results = append(results, doc)
  765. if len(results) >= maxResults {
  766. break
  767. }
  768. }
  769. // ✅ Step 4: 排序
  770. sort.Slice(results, func(i, j int) bool {
  771. return util.Float64All(results[i]["score"]) > util.Float64All(results[j]["score"])
  772. })
  773. return results, nil
  774. }
  775. func searchES23(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  776. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  777. filtersToTry := [][]elastic.Query{
  778. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  779. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  780. {elastic.NewTermsQuery("toptype", "拟建")},
  781. }
  782. var allResults []*elastic.SearchHit
  783. seenIDs := make(map[string]bool)
  784. for _, field := range fieldsToTry {
  785. if field == "detail" && len(allResults) > 0 {
  786. break
  787. }
  788. for _, filter := range filtersToTry {
  789. // 构建查询:使用 MultiMatchQuery + phrase
  790. query := elastic.NewBoolQuery().
  791. Must(elastic.NewMultiMatchQuery(projectName, field).Type("phrase")).
  792. Filter(filter...)
  793. fetchFields := elastic.NewFetchSourceContext(true).Include("id",
  794. "title", "projectname", "projectcode", "bidamount", "score", "area",
  795. "city", "toptype", "subtype", "buyer", "budget", "buyerperson", "buyertel",
  796. "s_winner", "winnertel", "agency", "publishtime")
  797. // 执行查询
  798. searchResult, err := client.Search().
  799. Index("bidding").
  800. Query(query).
  801. Size(70).
  802. FetchSourceContext(fetchFields). // 添加这一行,查询部分字段
  803. Do(context.Background())
  804. if err != nil {
  805. return nil, err
  806. }
  807. // 去重处理
  808. for _, hit := range searchResult.Hits.Hits {
  809. if !seenIDs[hit.Id] {
  810. seenIDs[hit.Id] = true
  811. allResults = append(allResults, hit)
  812. }
  813. }
  814. if len(allResults) >= maxResults {
  815. break
  816. }
  817. }
  818. if len(allResults) >= maxResults {
  819. break
  820. }
  821. }
  822. var results []map[string]interface{}
  823. seenProjectNames := make(map[string]bool)
  824. seenProjectCodes := make(map[string]bool)
  825. bidamountMap := make(map[float64]bool)
  826. //subtypeMap := make(map[string]bool)
  827. for _, hit := range allResults {
  828. var doc map[string]interface{}
  829. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  830. log.Info("解析文档失败", zap.Error(err))
  831. continue
  832. }
  833. projectNameValue := util.ObjToString(doc["projectname"])
  834. if projectNameValue == "" {
  835. continue
  836. }
  837. projectCode := util.ObjToString(doc["projectcode"])
  838. if projectCode != "" {
  839. if seenProjectCodes[projectCode] {
  840. continue
  841. }
  842. seenProjectCodes[projectCode] = true
  843. }
  844. bidamount := util.Float64All(doc["bidamount"])
  845. if bidamount != 0 {
  846. if bidamountMap[bidamount] {
  847. continue
  848. }
  849. bidamountMap[bidamount] = true
  850. }
  851. // 相似度筛选
  852. score := *hit.Score
  853. doc["score"] = score //相似度
  854. if score < scoreThreshold {
  855. continue
  856. }
  857. doc["score"] = score
  858. //detail := util.ObjToString(doc["detail"])
  859. id := util.ObjToString(doc["id"])
  860. bidd, _ := MgoB.FindById("bidding", id, nil)
  861. detail := util.ObjToString((*bidd)["detail"])
  862. // 字段中必须包含 projectName
  863. if detail != "" {
  864. if !strings.Contains(detail, projectName) {
  865. continue
  866. }
  867. }
  868. if buyer2 != "" {
  869. if !strings.Contains(detail, buyer2) {
  870. continue
  871. }
  872. }
  873. if seenProjectNames[projectNameValue] {
  874. continue
  875. }
  876. seenProjectNames[projectNameValue] = true
  877. doc["detail"] = detail
  878. results = append(results, doc)
  879. if len(results) >= maxResults {
  880. break
  881. }
  882. }
  883. // 排序:按 score 降序
  884. sort.Slice(results, func(i, j int) bool {
  885. si := util.Float64All(results[i]["score"])
  886. sj := util.Float64All(results[j]["score"])
  887. return si > sj
  888. })
  889. return results, nil
  890. }
  891. func searchES22(client *elastic.Client, projectName, buyer2 string, scoreThreshold float64, maxResults int) ([]map[string]interface{}, error) {
  892. fieldsToTry := []string{"projectname.pname", "title", "detail"}
  893. filtersToTry := [][]elastic.Query{
  894. {elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一")},
  895. {elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向")},
  896. {elastic.NewTermsQuery("toptype", "拟建")},
  897. }
  898. var allResults []*elastic.SearchHit
  899. seenIDs := make(map[string]bool)
  900. for _, field := range fieldsToTry {
  901. for _, filter := range filtersToTry {
  902. // 构建查询
  903. query := elastic.NewBoolQuery().
  904. Must(elastic.NewMatchQuery(field, projectName)).
  905. Filter(filter...)
  906. // 执行查询
  907. searchResult, err := client.Search().
  908. Index("bidding").
  909. Query(query).
  910. Size(70). // 多取一些,后面做筛选和去重
  911. Do(context.Background())
  912. if err != nil {
  913. return nil, err
  914. }
  915. for _, hit := range searchResult.Hits.Hits {
  916. if !seenIDs[hit.Id] {
  917. allResults = append(allResults, hit)
  918. seenIDs[hit.Id] = true
  919. }
  920. }
  921. if len(allResults) >= maxResults {
  922. break
  923. }
  924. }
  925. if len(allResults) >= maxResults {
  926. break
  927. }
  928. }
  929. var results []map[string]interface{}
  930. seenProjectNames := make(map[string]bool)
  931. seenProjectCodes := make(map[string]bool)
  932. bidamountMap := make(map[float64]bool)
  933. for _, hit := range allResults {
  934. var doc map[string]interface{}
  935. if err := json.Unmarshal(hit.Source, &doc); err != nil {
  936. log.Info("解析文档失败", zap.Error(err))
  937. continue
  938. }
  939. projectNameValue := util.ObjToString(doc["projectname"])
  940. if projectNameValue == "" {
  941. continue
  942. }
  943. projectCode := util.ObjToString(doc["projectcode"])
  944. if seenProjectCodes[projectCode] {
  945. continue
  946. }
  947. seenProjectCodes[projectCode] = true
  948. bidamount := util.Float64All(doc["bidamount"])
  949. if bidamountMap[bidamount] {
  950. continue
  951. }
  952. bidamountMap[bidamount] = true
  953. // 相似度筛选
  954. score := *hit.Score
  955. doc["score"] = score //相似度
  956. if score < scoreThreshold {
  957. continue
  958. }
  959. //id := util.ObjToString(doc["id"])
  960. //doc["jyhref"] = GetJyURLByID(id)
  961. //if site := util.ObjToString(doc["site"]); site == "中华人民共和国自然资源部" {
  962. // doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  963. //}
  964. // enrich: total_investment
  965. //if bidData, _ := MgoB.FindById("bidding", id, nil); bidData != nil {
  966. // if util.Float64All((*bidData)["total_investment"]) > 0 {
  967. // doc["total_investment"] = (*bidData)["total_investment"]
  968. // }
  969. //}
  970. doc["score"] = score
  971. detail := util.ObjToString(doc["detail"])
  972. // 字段中必须包含 projectName
  973. if buyer2 != "" {
  974. if !strings.Contains(detail, projectName) && !strings.Contains(detail, buyer2) {
  975. continue
  976. }
  977. }
  978. if seenProjectNames[projectNameValue] {
  979. continue
  980. }
  981. seenProjectNames[projectNameValue] = true
  982. results = append(results, doc)
  983. if len(results) >= maxResults {
  984. break
  985. }
  986. }
  987. return results, nil
  988. }
  989. func searchES(client *elastic.Client, projectName, buyer2 string) ([]map[string]interface{}, error) {
  990. query := elastic.NewBoolQuery().
  991. Must(
  992. //elastic.NewMatchQuery("projectname.pname", projectName), // 模糊匹配 projectname
  993. //elastic.NewMatchQuery("title", projectName), // 模糊匹配 projectname
  994. elastic.NewMatchQuery("detail", projectName), // 模糊匹配 projectname
  995. //elastic.NewTermQuery("area", "安徽"), // 过滤区域
  996. elastic.NewTermsQuery("subtype", "中标", "成交", "合同", "单一"), // 过滤 subtype
  997. //elastic.NewTermsQuery("toptype", "招标", "预告", "采购意向"), // 过滤 subtype
  998. //elastic.NewTermsQuery("toptype", "拟建"), // 过滤 subtype
  999. )
  1000. searchResult, err := client.Search().
  1001. Index("bidding").
  1002. Query(query).
  1003. Size(70). // 先取 12 条,确保足够数据
  1004. Do(context.Background())
  1005. if err != nil {
  1006. return nil, err
  1007. }
  1008. // 结果集
  1009. var results []map[string]interface{}
  1010. seenProjectNames := make(map[string]bool) // 用于去重
  1011. seenProjectCode := make(map[string]bool) // 用于去重
  1012. bidamountMap := make(map[float64]bool)
  1013. for _, hit := range searchResult.Hits.Hits {
  1014. var doc map[string]interface{}
  1015. err := json.Unmarshal(hit.Source, &doc)
  1016. if err != nil {
  1017. log.Info("解析文档失败", zap.Error(err))
  1018. continue
  1019. }
  1020. // 获取 `projectname`,防止 key 不存在时的错误
  1021. projectNameValue, ok := doc["projectname"].(string)
  1022. bidamount := util.Float64All(doc["bidamount"])
  1023. if !ok {
  1024. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  1025. continue
  1026. }
  1027. projectCodeValue := util.ObjToString(doc["projectcode"])
  1028. if seenProjectCode[projectCodeValue] {
  1029. continue
  1030. }
  1031. if projectCodeValue != "" {
  1032. seenProjectCode[projectCodeValue] = true
  1033. }
  1034. // **处理额外字段**
  1035. id := util.ObjToString(doc["id"])
  1036. bidData, _ := MgoB.FindById("bidding", id, nil)
  1037. if util.Float64All((*bidData)["total_investment"]) > 0 {
  1038. doc["total_investment"] = (*bidData)["total_investment"]
  1039. }
  1040. doc["jyhref"] = GetJyURLByID(id)
  1041. score := *hit.Score
  1042. site := util.ObjToString(doc["site"])
  1043. if site == "中华人民共和国自然资源部" {
  1044. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  1045. }
  1046. doc["score"] = score //相似度
  1047. detail := util.ObjToString(doc["detail"])
  1048. if !strings.Contains(detail, projectName) {
  1049. continue
  1050. }
  1051. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  1052. if seenProjectNames[projectNameValue] {
  1053. continue
  1054. }
  1055. if bidamountMap[bidamount] {
  1056. continue
  1057. }
  1058. // **记录该 `projectname`,避免重复**
  1059. seenProjectNames[projectNameValue] = true
  1060. bidamountMap[bidamount] = true
  1061. // **加入结果集**
  1062. results = append(results, doc)
  1063. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  1064. if len(results) >= 10 {
  1065. break
  1066. }
  1067. }
  1068. //2、判断正文包含采购单位
  1069. for _, hit := range searchResult.Hits.Hits {
  1070. var doc map[string]interface{}
  1071. err := json.Unmarshal(hit.Source, &doc)
  1072. if err != nil {
  1073. log.Info("解析文档失败:", zap.Error(err))
  1074. continue
  1075. }
  1076. // 获取 `projectname`,防止 key 不存在时的错误
  1077. projectNameValue, ok := doc["projectname"].(string)
  1078. bidamount := util.Float64All(doc["bidamount"])
  1079. if !ok {
  1080. log.Info("⚠️ 缺少 projectname 字段,跳过:", zap.Any("projectname", doc["projectname"]))
  1081. continue
  1082. }
  1083. // **处理额外字段**
  1084. id := util.ObjToString(doc["id"])
  1085. doc["jyhref"] = GetJyURLByID(id)
  1086. score := *hit.Score
  1087. doc["score"] = score //相似度
  1088. site := util.ObjToString(doc["site"])
  1089. if site == "中华人民共和国自然资源部" {
  1090. doc["title"] = "土地出让" + "-" + util.ObjToString(doc["title"])
  1091. }
  1092. //判断正文包含采购单位
  1093. detail := util.ObjToString(doc["detail"])
  1094. if !strings.Contains(detail, buyer2) {
  1095. continue
  1096. }
  1097. // **去重逻辑**:如果 `projectname` 已经出现过,则跳过
  1098. if seenProjectNames[projectNameValue] {
  1099. continue
  1100. }
  1101. if bidamountMap[bidamount] {
  1102. continue
  1103. }
  1104. // **记录该 `projectname`,避免重复**
  1105. seenProjectNames[projectNameValue] = true
  1106. bidamountMap[bidamount] = true
  1107. // **加入结果集**
  1108. results = append(results, doc)
  1109. // **如果已经找到 6 条不同 `projectname`,就跳出循环**
  1110. if len(results) >= 10 {
  1111. break
  1112. }
  1113. }
  1114. return results, nil
  1115. }
  1116. // GetJyURLByID 获取剑鱼地址
  1117. func GetJyURLByID(id string) string {
  1118. var Url = "https://www.jianyu360.com/article/content/%s.html"
  1119. url := fmt.Sprintf(Url, util.CommonEncodeArticle("content", id))
  1120. return url
  1121. }
  1122. // GetIdByURL 解密url,获取bidding ID
  1123. func GetIdByURL(url string) string {
  1124. if strings.Contains(url, "work-bench") {
  1125. return ""
  1126. }
  1127. if strings.Contains(url, "/article/content") {
  1128. urls := strings.Split(url, "content/")
  1129. res := strings.Split(urls[1], ".html")
  1130. ids := util.CommonDecodeArticle("content", res[0])
  1131. return ids[0]
  1132. }
  1133. if strings.HasSuffix(url, "appid") {
  1134. urls := strings.Split(url, "entservice/")
  1135. res := strings.Split(urls[1], ".html")
  1136. se := util.SimpleEncrypt{Key: "entservice"}
  1137. id := se.DecodeString(res[0])
  1138. return id
  1139. }
  1140. return ""
  1141. }
  1142. // isValidCodeFormat 判断 拟建项目编码
  1143. func isValidCodeFormat(s string) bool {
  1144. pattern := `^\d{4}-\d{6}-\d{2}-\d{2}-\d{6}$`
  1145. matched, err := regexp.MatchString(pattern, s)
  1146. if err != nil {
  1147. return false
  1148. }
  1149. return matched
  1150. }