es_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. package main
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/olivere/elastic/v7"
  7. "io"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "regexp"
  12. "sort"
  13. "strconv"
  14. "strings"
  15. "testing"
  16. )
  17. func TestEs(T *testing.T) {
  18. s := "aa"
  19. fmt.Println(strings.Contains(s, ","))
  20. //re := regexp.MustCompile(`包\d+`) // 标题只有一个包2
  21. //re := regexp.MustCompile(`\d+批包(\d+(?:、\d+)*)`)
  22. // 定义正则表达式
  23. //re3 := regexp.MustCompile(`(?:包)?(\d+(?:、\d+)*)包?`) //标题含有多个包;包10、12、14、17、18、19
  24. //re3 := regexp.MustCompile(`包(\d+(?:、\d+)*)`) //标题含有多个包;包10、12、14、17、18、19
  25. ////re3 := regexp.MustCompile(`\d+包`)//冀中股份2023年12月标准件计划框架协议采购2包12.7
  26. ////re2 := regexp.MustCompile(`标包\d+`)//2024一季度水火电一般工序类中小金工件(标包1)-招标公告
  27. //text := "中国绿发投资集团有限公司直属项目公司2023年第20批集中采购非招标项目(包10、12、14、17、18、19"
  28. //matches := re.FindAllString(util.ObjToString(text), -1)
  29. //log.Println(matches)
  30. //title := "中电科青岛科技产业园一期设计施工总承包1标段"
  31. //rea := regexp.MustCompile(`总承包\d标段`) //总承包1标段
  32. //maches := rea.FindAllString(title, -1)
  33. //log.Println("rea", maches)
  34. //reb := regexp.MustCompile(`包\d[-~]\d`) //包1-6
  35. //title = "2023年度分布式光伏发电项目主要设备(包1~6)设备采购合格供应商"
  36. //matches := reb.FindAllString(title, -1)
  37. //log.Println("reb", matches)
  38. //re := regexp.MustCompile(`\d+批包(\d+(?:、\d+)*)`)
  39. re := regexp.MustCompile(`包?\d{1,2}[-~、和](包)?\d{1,2}包?`) //1-6包;01-06包;01、02包;包1、包2
  40. title := "中国绿发投资集团有限公司直属项目公司2023年第20批集中标段1采购非招标项目(包10、12、14、17、18、19"
  41. matches := re.FindAllString(title, -1)
  42. log.Println("rec", matches)
  43. //
  44. //re2 := regexp.MustCompile(`包?\d{1,2}`) // 标题只有一个包2
  45. //re3 := regexp.MustCompile(`\d{1,2}包`) // 标题只有一个包2
  46. re3 := regexp.MustCompile(`(标段[1-9一二三四五六七八九]|包[1-9一二三四五六七八九]?[0-9]|[1-9一二三四五六七八九]?[0-9]包|[a-kA-K]包)`) // 标题只有一个包2
  47. title = "济南市历下标段3区东关中心幼儿园超市包21、22、23、24"
  48. matches = re3.FindAllString(title, -1)
  49. log.Println("aaa", matches)
  50. //ree := regexp.MustCompile(`第\d{1,2}[批]|\d{1,2}标段|标段\d{1,2}`)
  51. //ree2 := regexp.MustCompile(`(第)?\d{1,2}[标段\d{1,2}]`)
  52. //log.Println(ree2.FindAllString(title, -1))
  53. // 原始数据
  54. data := []string{"包1", "包1-2", "包4", "包4-5", "包3", "包5", "包9", "包11", "包12", "11包", "标段1", "包13"}
  55. // 调用去重函数
  56. uniquePackages := removeDuplicates(data)
  57. // 输出去重后的结果
  58. fmt.Println("去重后的数据:", uniquePackages)
  59. }
  60. func removeDuplicates(data []string) []string {
  61. // 存储已存在的包号
  62. existingPackages := make(map[int]bool)
  63. // 存储包含包号信息的字符串
  64. packages := make(map[int]string)
  65. // 匹配包号的正则表达式
  66. re := regexp.MustCompile(`(包)(\d+)(?:-(\d+))?`)
  67. noexists := make([]string, 0)
  68. // 遍历数据
  69. for _, item := range data {
  70. // 提取包号信息
  71. matches := re.FindStringSubmatch(item)
  72. if len(matches) < 3 {
  73. noexists = append(noexists, item)
  74. continue
  75. }
  76. // 解析包号
  77. start, _ := strconv.Atoi(matches[2])
  78. end := start
  79. if len(matches[3]) > 0 {
  80. end, _ = strconv.Atoi(matches[3])
  81. }
  82. // 添加到已存在的包号中
  83. for i := start; i <= end; i++ {
  84. existingPackages[i] = true
  85. }
  86. // 将包含包号信息的字符串存储到 packages 中
  87. packages[start] = matches[0]
  88. }
  89. // 从 map 中提取去重后的包号并排序
  90. var uniquePackages []int
  91. for packageNum := range existingPackages {
  92. uniquePackages = append(uniquePackages, packageNum)
  93. }
  94. sort.Ints(uniquePackages)
  95. // 将连续的包号转换为包含范围的字符串
  96. var result []string
  97. var start, end int
  98. for i, num := range uniquePackages {
  99. if i == 0 {
  100. start = num
  101. end = num
  102. } else if num == end+1 {
  103. end = num
  104. } else {
  105. if start == end {
  106. result = append(result, packages[start])
  107. } else {
  108. result = append(result, fmt.Sprintf("包%d-%d", start, end))
  109. }
  110. start = num
  111. end = num
  112. }
  113. }
  114. if start == end {
  115. result = append(result, packages[start])
  116. } else {
  117. result = append(result, fmt.Sprintf("包%d-%d", start, end))
  118. }
  119. result = append(result, noexists...)
  120. return result
  121. }
  122. // syncEs 同步es 数据道信集群
  123. func TestSyncEs(T *testing.T) {
  124. //url := "http://172.17.4.184:19805"
  125. url := "http://127.0.0.1:19805"
  126. username := "es_all"
  127. password := "TopJkO2E_d1x"
  128. index := "bidding" //索引名称
  129. // 创建 Elasticsearch 客户端
  130. client, err := elastic.NewClient(
  131. elastic.SetURL(url),
  132. elastic.SetBasicAuth(username, password),
  133. elastic.SetSniff(false),
  134. )
  135. if err != nil {
  136. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  137. }
  138. url2 := "http://127.0.0.1:19905"
  139. username2 := "jybid"
  140. password2 := "Top2023_JEB01i@31"
  141. // 创建 Elasticsearch 客户端
  142. client2, err := elastic.NewClient(
  143. elastic.SetURL(url2),
  144. elastic.SetBasicAuth(username2, password2),
  145. elastic.SetSniff(false),
  146. )
  147. if err != nil {
  148. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  149. }
  150. rangeQuery := elastic.NewRangeQuery("id").Gte("65869b436977356f55a01b0b").Lt("6586a4196977356f55a02c79")
  151. query := elastic.NewBoolQuery().Must(rangeQuery)
  152. ctx := context.Background()
  153. //开始滚动搜索
  154. scrollID := ""
  155. scroll := "1m"
  156. searchSource := elastic.NewSearchSource().
  157. Query(query).
  158. Size(10000).
  159. Sort("_doc", true) //升序排序
  160. //Sort("_doc", false) //降序排序
  161. searchService := client.Scroll(index).
  162. Size(10000).
  163. Scroll(scroll).
  164. SearchSource(searchSource)
  165. res, err := searchService.Do(ctx)
  166. if err != nil {
  167. if err == io.EOF {
  168. fmt.Println("没有数据")
  169. } else {
  170. panic(err)
  171. }
  172. }
  173. defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  174. fmt.Println("总数是:", res.TotalHits())
  175. total := 0
  176. for len(res.Hits.Hits) > 0 {
  177. for _, hit := range res.Hits.Hits {
  178. var doc map[string]interface{}
  179. err := json.Unmarshal(hit.Source, &doc)
  180. if err != nil {
  181. log.Printf("解析文档失败:%s", err)
  182. continue
  183. }
  184. id := util.ObjToString(doc["id"])
  185. client2.Index().Index(index).Id(id).BodyJson(doc).Do(ctx)
  186. }
  187. total = total + len(res.Hits.Hits)
  188. scrollID = res.ScrollId
  189. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  190. log.Println("current count:", total)
  191. if err != nil {
  192. if err == io.EOF {
  193. // 滚动到最后一批数据,退出循环
  194. break
  195. }
  196. log.Printf("滚动搜索失败:%s", err)
  197. break // 处理错误时退出循环
  198. }
  199. }
  200. fmt.Println("结束~~~~~~~~~~~~~~~")
  201. }
  202. func TestGetP(T *testing.T) {
  203. MgoB := &mongodb.MongodbSim{
  204. //MongodbAddr: "172.17.189.140:27080",
  205. MongodbAddr: "127.0.0.1:27083",
  206. Size: 10,
  207. DbName: "qfw",
  208. UserName: "SJZY_RWbid_ES",
  209. Password: "SJZY@B4i4D5e6S",
  210. Direct: true,
  211. }
  212. MgoB.InitPool()
  213. //url := "http://172.17.4.184:19805"
  214. url := "http://127.0.0.1:19805"
  215. username := "es_all"
  216. password := "TopJkO2E_d1x"
  217. index := "projectset" //索引名称
  218. // 创建 Elasticsearch 客户端
  219. client, err := elastic.NewClient(
  220. elastic.SetURL(url),
  221. elastic.SetBasicAuth(username, password),
  222. elastic.SetSniff(false),
  223. )
  224. if err != nil {
  225. log.Fatalf("创建 Elasticsearch 客户端失败:%s", err)
  226. }
  227. rangeQuery := elastic.NewRangeQuery("pici").Gt("1672502400").Lte("1704038400")
  228. //termQ := elastic.NewTermQuery("multipackage", 0)
  229. //rangeQuery := elastic.NewRangeQuery("id").Gt("657b08556977356f5578cb25").Lte("657b08556977356f5578cb26")
  230. query := elastic.NewBoolQuery().Must(rangeQuery)
  231. ctx := context.Background()
  232. //开始滚动搜索
  233. scrollID := ""
  234. scroll := "1m"
  235. searchSource := elastic.NewSearchSource().
  236. Query(query).
  237. Size(1000).
  238. Sort("_doc", true) //升序排序
  239. //Sort("_doc", false) //降序排序
  240. searchService := client.Scroll(index).
  241. Size(1000).
  242. Scroll(scroll).
  243. SearchSource(searchSource)
  244. res, err := searchService.Do(ctx)
  245. if err != nil {
  246. if err == io.EOF {
  247. fmt.Println("没有数据")
  248. } else {
  249. panic(err)
  250. }
  251. }
  252. defer client.ClearScroll().ScrollId(scrollID).Do(ctx) // 在退出时清理资源
  253. fmt.Println("总数是:", res.TotalHits())
  254. total := 0
  255. for len(res.Hits.Hits) > 0 {
  256. for _, hit := range res.Hits.Hits {
  257. var doc map[string]interface{}
  258. err := json.Unmarshal(hit.Source, &doc)
  259. if err != nil {
  260. log.Printf("解析文档失败:%s", err)
  261. continue
  262. }
  263. //id := util.ObjToString(doc["id"])
  264. //log.Println(id)
  265. var matchWords = make([]string, 0)
  266. var matchList = make([]interface{}, 0)
  267. if list, ok := doc["list"].([]interface{}); ok {
  268. for _, v := range list {
  269. if da, ok := v.(map[string]interface{}); ok {
  270. if util.ObjToString(da["toptype"]) != "招标" {
  271. continue
  272. }
  273. title := util.ObjToString(da["title"])
  274. // 使用正则表达式进行匹配
  275. matches := GetMatches(title)
  276. if len(matches) > 0 {
  277. matchList = append(matchList, da)
  278. }
  279. matchWords = append(matchWords, matches...)
  280. }
  281. }
  282. }
  283. insert := make(map[string]interface{})
  284. insert["project_id"] = doc["id"]
  285. insert["_id"] = doc["id"]
  286. insert["multipackage"] = doc["multipackage"]
  287. insert["list"] = doc["list"]
  288. insert["projectname"] = doc["projectname"]
  289. insert["sourceinfourl"] = doc["sourceinfourl"]
  290. if len(matchWords) > 0 {
  291. insert["matchList"] = matchList
  292. insert["package_name"] = util.ObjToString(doc["projectname"]) + "-" + strings.Join(matchWords, "、")
  293. insert["type"] = 1
  294. }
  295. MgoB.SaveByOriID("wcc_project_20240304", insert)
  296. }
  297. total = total + len(res.Hits.Hits)
  298. scrollID = res.ScrollId
  299. res, err = client.Scroll().ScrollId(scrollID).Scroll(scroll).Do(ctx)
  300. log.Println("current count:", total)
  301. if err != nil {
  302. if err == io.EOF {
  303. // 滚动到最后一批数据,退出循环
  304. break
  305. }
  306. log.Printf("滚动搜索失败:%s", err)
  307. break // 处理错误时退出循环
  308. }
  309. }
  310. fmt.Println("结束~~~~~~~~~~~~~~~")
  311. }