es_test.go 16 KB


  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "github.com/xuri/excelize/v2"
  8. "io"
  9. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  10. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  11. "log"
  12. "regexp"
  13. "sort"
  14. "strconv"
  15. "strings"
  16. "testing"
  17. "time"
  18. )
  19. func TestEs(T *testing.T) {
  20. s := "aa"
  21. fmt.Println(strings.Contains(s, ","))
  22. //re := regexp.MustCompile(`包\d+`) // 标题只有一个包2
  23. //re := regexp.MustCompile(`\d+批包(\d+(?:、\d+)*)`)
  24. // 定义正则表达式
  25. //re3 := regexp.MustCompile(`(?:包)?(\d+(?:、\d+)*)包?`) //标题含有多个包;包10、12、14、17、18、19
  26. //re3 := regexp.MustCompile(`包(\d+(?:、\d+)*)`) //标题含有多个包;包10、12、14、17、18、19
  27. ////re3 := regexp.MustCompile(`\d+包`)//冀中股份2023年12月标准件计划框架协议采购2包12.7
  28. ////re2 := regexp.MustCompile(`标包\d+`)//2024一季度水火电一般工序类中小金工件(标包1)-招标公告
  29. //text := "中国绿发投资集团有限公司直属项目公司2023年第20批集中采购非招标项目(包10、12、14、17、18、19"
  30. //matches := re.FindAllString(util.ObjToString(text), -1)
  31. //log.Println(matches)
  32. //title := "中电科青岛科技产业园一期设计施工总承包1标段"
  33. //rea := regexp.MustCompile(`总承包\d标段`) //总承包1标段
  34. //maches := rea.FindAllString(title, -1)
  35. //log.Println("rea", maches)
  36. //reb := regexp.MustCompile(`包\d[-~]\d`) //包1-6
  37. //title = "2023年度分布式光伏发电项目主要设备(包1~6)设备采购合格供应商"
  38. //matches := reb.FindAllString(title, -1)
  39. //log.Println("reb", matches)
  40. //re := regexp.MustCompile(`\d+批包(\d+(?:、\d+)*)`)
  41. re := regexp.MustCompile(`包?\d{1,2}[-~、和](包)?\d{1,2}包?`) //1-6包;01-06包;01、02包;包1、包2
  42. title := "中国绿发投资集团有限公司直属项目公司2023年第20批集中标段1采购非招标项目(包10、12、14、17、18、19"
  43. matches := re.FindAllString(title, -1)
  44. log.Println("rec", matches)
  45. //
  46. //re2 := regexp.MustCompile(`包?\d{1,2}`) // 标题只有一个包2
  47. //re3 := regexp.MustCompile(`\d{1,2}包`) // 标题只有一个包2
  48. re3 := regexp.MustCompile(`(标段[1-9一二三四五六七八九]|包[1-9一二三四五六七八九]?[0-9]|[1-9一二三四五六七八九]?[0-9]包|[a-kA-K]包)`) // 标题只有一个包2
  49. title = "济南市历下标段3区东关中心幼儿园超市包21、22、23、24"
  50. matches = re3.FindAllString(title, -1)
  51. log.Println("aaa", matches)
  52. //ree := regexp.MustCompile(`第\d{1,2}[批]|\d{1,2}标段|标段\d{1,2}`)
  53. //ree2 := regexp.MustCompile(`(第)?\d{1,2}[标段\d{1,2}]`)
  54. //log.Println(ree2.FindAllString(title, -1))
  55. // 原始数据
  56. data := []string{"包1", "包1-2", "包4", "包4-5", "包3", "包5", "包9", "包11", "包12", "11包", "标段1", "包13"}
  57. // 调用去重函数
  58. uniquePackages := removeDuplicates(data)
  59. // 输出去重后的结果
  60. fmt.Println("去重后的数据:", uniquePackages)
  61. }
  62. func removeDuplicates(data []string) []string {
  63. // 存储已存在的包号
  64. existingPackages := make(map[int]bool)
  65. // 存储包含包号信息的字符串
  66. packages := make(map[int]string)
  67. // 匹配包号的正则表达式
  68. re := regexp.MustCompile(`(包)(\d+)(?:-(\d+))?`)
  69. noexists := make([]string, 0)
  70. // 遍历数据
  71. for _, item := range data {
  72. // 提取包号信息
  73. matches := re.FindStringSubmatch(item)
  74. if len(matches) < 3 {
  75. noexists = append(noexists, item)
  76. continue
  77. }
  78. // 解析包号
  79. start, _ := strconv.Atoi(matches[2])
  80. end := start
  81. if len(matches[3]) > 0 {
  82. end, _ = strconv.Atoi(matches[3])
  83. }
  84. // 添加到已存在的包号中
  85. for i := start; i <= end; i++ {
  86. existingPackages[i] = true
  87. }
  88. // 将包含包号信息的字符串存储到 packages 中
  89. packages[start] = matches[0]
  90. }
  91. // 从 map 中提取去重后的包号并排序
  92. var uniquePackages []int
  93. for packageNum := range existingPackages {
  94. uniquePackages = append(uniquePackages, packageNum)
  95. }
  96. sort.Ints(uniquePackages)
  97. // 将连续的包号转换为包含范围的字符串
  98. var result []string
  99. var start, end int
  100. for i, num := range uniquePackages {
  101. if i == 0 {
  102. start = num
  103. end = num
  104. } else if num == end+1 {
  105. end = num
  106. } else {
  107. if start == end {
  108. result = append(result, packages[start])
  109. } else {
  110. result = append(result, fmt.Sprintf("包%d-%d", start, end))
  111. }
  112. start = num
  113. end = num
  114. }
  115. }
  116. if start == end {
  117. result = append(result, packages[start])
  118. } else {
  119. result = append(result, fmt.Sprintf("包%d-%d", start, end))
  120. }
  121. result = append(result, noexists...)
  122. return result
  123. }
  124. // syncEs 同步es 数据道信集群
  125. func TestSyncEs(T *testing.T) {
  126. //url := "http://172.17.4.184:19805"
  127. url := "http://127.0.0.1:19905"
  128. username := "jybid"
  129. password := "Top2023_JEB01i@31"
  130. index := "bidding" //索引名称
  131. // 创建 Elasticsearch 客户端
  132. client, err := elastic.NewClient(
  133. elastic.SetURL(url),
  134. elastic.SetBasicAuth(username, password),
  135. elastic.SetSniff(false),
  136. )
  137. if err != nil {
  138. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  139. }
  140. url2 := "http://192.168.3.149:9201"
  141. username2 := ""
  142. password2 := ""
  143. // 创建 Elasticsearch 客户端
  144. client2, err := elastic.NewClient(
  145. elastic.SetURL(url2),
  146. elastic.SetBasicAuth(username2, password2),
  147. elastic.SetSniff(false),
  148. )
  149. if err != nil {
  150. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  151. }
  152. rangeQuery := elastic.NewRangeQuery("id").Gte("65869b436977356f55a01b0b").Lt("6586a4196977356f55a02c79")
  153. query := elastic.NewBoolQuery().Must(rangeQuery)
  154. ctx := context.Background()
  155. //开始滚动搜索
  156. scrollID := ""
  157. scroll := "1m"
  158. searchSource := elastic.NewSearchSource().
  159. Query(query).
  160. Size(10000).
  161. Sort("_doc", true) //升序排序
  162. //Sort("_doc", false) //降序排序
  163. searchService := client.Scroll(index).
  164. Size(10000).
  165. Scroll(scroll).
  166. SearchSource(searchSource)
  167. res, err := searchService.Do(ctx)
  168. if err != nil {
  169. if err == io.EOF {
  170. fmt.Println("没有数据")
  171. } else {
  172. panic(err)
  173. }
  174. }
  175. defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  176. fmt.Println("总数是:", res.TotalHits())
  177. total := 0
  178. for len(res.Hits.Hits) > 0 {
  179. for _, hit := range res.Hits.Hits {
  180. var doc map[string]interface{}
  181. err := json.Unmarshal(hit.Source, &doc)
  182. if err != nil {
  183. log.Printf("解析文档失败:%s", err)
  184. continue
  185. }
  186. id := util.ObjToString(doc["id"])
  187. client2.Index().Index(index).Id(id).BodyJson(doc).Do(ctx)
  188. }
  189. total = total + len(res.Hits.Hits)
  190. scrollID = res.ScrollId
  191. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  192. log.Println("current count:", total)
  193. if err != nil {
  194. if err == io.EOF {
  195. // 滚动到最后一批数据,退出循环
  196. break
  197. }
  198. log.Printf("滚动搜索失败:%s", err)
  199. break // 处理错误时退出循环
  200. }
  201. }
  202. fmt.Println("结束~~~~~~~~~~~~~~~")
  203. }
  204. func TestGetP(T *testing.T) {
  205. MgoB := &mongodb.MongodbSim{
  206. //MongodbAddr: "172.17.189.140:27080",
  207. MongodbAddr: "127.0.0.1:27083",
  208. Size: 10,
  209. DbName: "qfw",
  210. UserName: "SJZY_RWbid_ES",
  211. Password: "SJZY@B4i4D5e6S",
  212. Direct: true,
  213. }
  214. MgoB.InitPool()
  215. //url := "http://172.17.4.184:19805"
  216. url := "http://127.0.0.1:19805"
  217. username := "es_all"
  218. password := "TopJkO2E_d1x"
  219. index := "projectset" //索引名称
  220. // 创建 Elasticsearch 客户端
  221. client, err := elastic.NewClient(
  222. elastic.SetURL(url),
  223. elastic.SetBasicAuth(username, password),
  224. elastic.SetSniff(false),
  225. )
  226. if err != nil {
  227. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  228. }
  229. rangeQuery := elastic.NewRangeQuery("pici").Gt("1672502400").Lte("1704038400")
  230. //termQ := elastic.NewTermQuery("multipackage", 0)
  231. //rangeQuery := elastic.NewRangeQuery("id").Gt("657b08556977356f5578cb25").Lte("657b08556977356f5578cb26")
  232. query := elastic.NewBoolQuery().Must(rangeQuery)
  233. ctx := context.Background()
  234. //开始滚动搜索
  235. scrollID := ""
  236. scroll := "1m"
  237. searchSource := elastic.NewSearchSource().
  238. Query(query).
  239. Size(1000).
  240. Sort("_doc", true) //升序排序
  241. //Sort("_doc", false) //降序排序
  242. searchService := client.Scroll(index).
  243. Size(1000).
  244. Scroll(scroll).
  245. SearchSource(searchSource)
  246. res, err := searchService.Do(ctx)
  247. if err != nil {
  248. if err == io.EOF {
  249. fmt.Println("没有数据")
  250. } else {
  251. panic(err)
  252. }
  253. }
  254. defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  255. fmt.Println("总数是:", res.TotalHits())
  256. total := 0
  257. for len(res.Hits.Hits) > 0 {
  258. for _, hit := range res.Hits.Hits {
  259. var doc map[string]interface{}
  260. err := json.Unmarshal(hit.Source, &doc)
  261. if err != nil {
  262. log.Printf("解析文档失败:%s", err)
  263. continue
  264. }
  265. //id := util.ObjToString(doc["id"])
  266. //log.Println(id)
  267. var matchWords = make([]string, 0)
  268. var matchList = make([]interface{}, 0)
  269. if list, ok := doc["list"].([]interface{}); ok {
  270. for _, v := range list {
  271. if da, ok := v.(map[string]interface{}); ok {
  272. if util.ObjToString(da["toptype"]) != "招标" {
  273. continue
  274. }
  275. title := util.ObjToString(da["title"])
  276. // 使用正则表达式进行匹配
  277. matches := GetMatches(title)
  278. if len(matches) > 0 {
  279. matchList = append(matchList, da)
  280. }
  281. matchWords = append(matchWords, matches...)
  282. }
  283. }
  284. }
  285. insert := make(map[string]interface{})
  286. insert["project_id"] = doc["id"]
  287. insert["_id"] = doc["id"]
  288. insert["multipackage"] = doc["multipackage"]
  289. insert["list"] = doc["list"]
  290. insert["projectname"] = doc["projectname"]
  291. insert["sourceinfourl"] = doc["sourceinfourl"]
  292. if len(matchWords) > 0 {
  293. insert["matchList"] = matchList
  294. insert["package_name"] = util.ObjToString(doc["projectname"]) + "-" + strings.Join(matchWords, "、")
  295. insert["type"] = 1
  296. }
  297. MgoB.SaveByOriID("wcc_project_20240304", insert)
  298. }
  299. total = total + len(res.Hits.Hits)
  300. scrollID = res.ScrollId
  301. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  302. log.Println("current count:", total)
  303. if err != nil {
  304. if err == io.EOF {
  305. // 滚动到最后一批数据,退出循环
  306. break
  307. }
  308. log.Printf("滚动搜索失败:%s", err)
  309. break // 处理错误时退出循环
  310. }
  311. }
  312. fmt.Println("结束~~~~~~~~~~~~~~~")
  313. }
  314. // TestSyncEsDataLimit 同步固定数量es 数据
  315. func TestSyncEsDataLimit(T *testing.T) {
  316. url := "http://192.168.3.149:9201"
  317. username := ""
  318. //username := "jybid"
  319. password := ""
  320. //password := "Top2023_JEB01i@31"
  321. index := "bidding" //索引名称
  322. // 创建 Elasticsearch 客户端
  323. client, err := elastic.NewClient(
  324. elastic.SetURL(url),
  325. elastic.SetBasicAuth(username, password),
  326. elastic.SetSniff(false),
  327. )
  328. if err != nil {
  329. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  330. }
  331. //url2 := "http://192.168.3.149:9201"
  332. //username2 := ""
  333. //password2 := ""
  334. // 创建 Elasticsearch 客户端
  335. //client2, err := elastic.NewClient(
  336. // elastic.SetURL(url2),
  337. // elastic.SetBasicAuth(username2, password2),
  338. // elastic.SetSniff(false),
  339. //)
  340. //if err != nil {
  341. // log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  342. //}
  343. // 构建查询
  344. query := elastic.NewBoolQuery().
  345. Must(elastic.NewTermQuery("subtype", "采购意向")).
  346. Must(elastic.NewMatchQuery("tag_topinformation", "情报_环境采购"))
  347. //MustNot(elastic.NewMatchQuery("company_type", "个体工商户"))
  348. // 执行查询
  349. searchResult, err := client.Search().Size(500).
  350. Index(index).
  351. Query(query).
  352. Do(context.Background())
  353. if err != nil {
  354. log.Fatalf("Error executing search: %s", err)
  355. }
  356. MgoB := &mongodb.MongodbSim{
  357. //MongodbAddr: "172.17.189.140:27080",
  358. MongodbAddr: "127.0.0.1:27083",
  359. Size: 10,
  360. DbName: "qfw",
  361. UserName: "SJZY_RWbid_ES",
  362. Password: "SJZY@B4i4D5e6S",
  363. Direct: true,
  364. }
  365. MgoB.InitPool()
  366. ////测试环境MongoDB
  367. MgoT := &mongodb.MongodbSim{
  368. MongodbAddr: "192.168.3.206:27002",
  369. DbName: "qfw_data",
  370. Size: 10,
  371. UserName: "root",
  372. Password: "root",
  373. }
  374. MgoT.InitPool()
  375. for _, hit := range searchResult.Hits.Hits {
  376. var doc map[string]interface{}
  377. err := json.Unmarshal(hit.Source, &doc)
  378. if err != nil {
  379. log.Printf("解析文档失败:%s", err)
  380. continue
  381. }
  382. id := util.ObjToString(doc["id"])
  383. data, _ := MgoB.FindById("bidding", id, nil)
  384. //client2.Index().Index(index).Id(id).BodyJson(doc).Do(context.Background())
  385. MgoT.SaveByOriID("bidding", *data)
  386. }
  387. fmt.Println("结束~~~~~~~~~~~~~~~")
  388. }
  389. // TestGetQyDataLimit 导出企业数据 qyxy
  390. func TestGetQyDataLimit(T *testing.T) {
  391. url := "http://127.0.0.1:19908"
  392. username := "jybid"
  393. password := "Top2023_JEB01i@31"
  394. index := "qyxy" //索引名称
  395. // 创建 Elasticsearch 客户端
  396. client, err := elastic.NewClient(
  397. elastic.SetURL(url),
  398. elastic.SetBasicAuth(username, password),
  399. elastic.SetSniff(false),
  400. )
  401. if err != nil {
  402. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  403. }
  404. // 构建查询
  405. rangeQuery := elastic.NewRangeQuery("establish_date").Gte("1704038400").Lt("1719763200")
  406. query := elastic.NewBoolQuery().
  407. Must(elastic.NewTermQuery("company_city", "长春市")).
  408. Must(elastic.NewMatchQuery("company_type", "个体工商户")).
  409. Must(elastic.NewMatchQuery("company_status", "存续")).
  410. Must(rangeQuery)
  411. MgoB := &mongodb.MongodbSim{
  412. //MongodbAddr: "172.17.189.140:27080",
  413. MongodbAddr: "127.0.0.1:27083",
  414. Size: 10,
  415. DbName: "qfw",
  416. UserName: "SJZY_RWbid_ES",
  417. Password: "SJZY@B4i4D5e6S",
  418. Direct: true,
  419. }
  420. MgoB.InitPool()
  421. // 执行查询
  422. searchResult, err := client.Search().Size(200).
  423. Index(index).
  424. Query(query).
  425. Do(context.Background())
  426. if err != nil {
  427. log.Fatalf("Error executing search: %s", err)
  428. }
  429. for _, hit := range searchResult.Hits.Hits {
  430. var doc map[string]interface{}
  431. err := json.Unmarshal(hit.Source, &doc)
  432. if err != nil {
  433. log.Printf("解析文档失败:%s", err)
  434. continue
  435. }
  436. timeObj := time.Unix(util.Int64All(doc["establish_date"]), 0)
  437. insert := map[string]interface{}{
  438. "company_name": doc["company_name"],
  439. "company_type": doc["company_type"],
  440. "company_status": doc["company_status"],
  441. "authority": doc["authority"],
  442. "company_address": doc["company_address"],
  443. "company_code": doc["company_code"],
  444. "credit_no": doc["credit_no"],
  445. "legal_person": doc["legal_person"],
  446. "operation_startdate": doc["operation_startdate"],
  447. "operation_enddate": doc["operation_enddate"],
  448. "business_scope": doc["business_scope"], //经营范围
  449. "establish_date": timeObj.Format("2006-01-02 15:04:05"), // 注册时间,
  450. }
  451. MgoB.Save("wcc_qyxy_changchun_200", insert)
  452. //id := util.ObjToString(doc["id"])
  453. //data, _ := MgoB.FindById("bidding", id, nil)
  454. //client2.Index().Index(index).Id(id).BodyJson(doc).Do(context.Background())
  455. //MgoT.SaveByOriID("bidding", *data)
  456. }
  457. fmt.Println("结束~~~~~~~~~~~~~~~")
  458. }
  459. func TestAA(T *testing.T) {
  460. MgoB := &mongodb.MongodbSim{
  461. //MongodbAddr: "172.17.189.140:27080",
  462. MongodbAddr: "127.0.0.1:27083",
  463. Size: 10,
  464. DbName: "qfw",
  465. UserName: "SJZY_RWbid_ES",
  466. Password: "SJZY@B4i4D5e6S",
  467. Direct: true,
  468. }
  469. MgoB.InitPool()
  470. f, err := excelize.OpenFile("./test_data.xlsx")
  471. if err != nil {
  472. fmt.Println(err)
  473. return
  474. }
  475. defer func() {
  476. f.Save()
  477. if err := f.Close(); err != nil {
  478. fmt.Println(err)
  479. }
  480. }()
  481. //2.专项债详情
  482. rows, err := f.GetRows("Sheet1")
  483. if err != nil {
  484. fmt.Println(err)
  485. return
  486. }
  487. for i := 1; i < len(rows); i++ {
  488. insert := map[string]interface{}{
  489. "name": rows[i][0],
  490. "national_top": rows[i][1],
  491. "national_sub": rows[i][2],
  492. "national_subsub": rows[i][3],
  493. "result": rows[i][4],
  494. }
  495. where := map[string]interface{}{
  496. "name": rows[i][0],
  497. }
  498. data, _ := MgoB.FindOne("ent_info_buyer", where)
  499. if data != nil && len(*data) > 0 {
  500. insert["company_type"] = (*data)["company_type"]
  501. insert["label1"] = (*data)["label1"]
  502. insert["label2"] = (*data)["label2"]
  503. insert["label3"] = (*data)["label3"]
  504. }
  505. MgoB.Save("ent_info_buyer_test", insert)
  506. }
  507. }