elasticSim.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693
  1. package elastic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "runtime"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. type Elastic struct {
  17. S_esurl string
  18. I_size int
  19. Addrs []string
  20. Pool chan *es.Client
  21. lastTime int64
  22. lastTimeLock sync.Mutex
  23. ntimeout int
  24. Username string
  25. Password string
  26. }
  27. func (e *Elastic) InitElasticSize() {
  28. e.Pool = make(chan *es.Client, e.I_size)
  29. for _, s := range strings.Split(e.S_esurl, ",") {
  30. e.Addrs = append(e.Addrs, s)
  31. }
  32. for i := 0; i < e.I_size; i++ {
  33. client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
  34. e.Pool <- client
  35. }
  36. }
  37. // 关闭连接
  38. func (e *Elastic) DestoryEsConn(client *es.Client) {
  39. select {
  40. case e.Pool <- client:
  41. break
  42. case <-time.After(time.Second * 1):
  43. if client != nil {
  44. client.Stop()
  45. }
  46. client = nil
  47. }
  48. }
  49. func (e *Elastic) GetEsConn() *es.Client {
  50. select {
  51. case c := <-e.Pool:
  52. if c == nil || !c.IsRunning() {
  53. log.Println("new esclient.", len(e.Pool))
  54. client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
  55. es.SetMaxRetries(2), es.SetSniff(false))
  56. if err == nil && client.IsRunning() {
  57. return client
  58. }
  59. }
  60. return c
  61. case <-time.After(time.Second * 4):
  62. //超时
  63. e.ntimeout++
  64. e.lastTimeLock.Lock()
  65. defer e.lastTimeLock.Unlock()
  66. //12秒后允许创建链接
  67. c := time.Now().Unix() - e.lastTime
  68. if c > 12 {
  69. e.lastTime = time.Now().Unix()
  70. log.Println("add client..", len(e.Pool))
  71. c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
  72. go func() {
  73. for i := 0; i < 2; i++ {
  74. client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
  75. e.Pool <- client
  76. }
  77. }()
  78. return c
  79. }
  80. return nil
  81. }
  82. }
  83. func (e *Elastic) Get(index string, query es.Query) *[]map[string]interface{} {
  84. client := e.GetEsConn()
  85. defer func() {
  86. go e.DestoryEsConn(client)
  87. }()
  88. var res []map[string]interface{}
  89. if client != nil {
  90. defer func() {
  91. if r := recover(); r != nil {
  92. log.Println("[E]", r)
  93. for skip := 1; ; skip++ {
  94. _, file, line, ok := runtime.Caller(skip)
  95. if !ok {
  96. break
  97. }
  98. go log.Printf("%v,%v\n", file, line)
  99. }
  100. }
  101. }()
  102. searchResult, err := client.Search().Index(index).Query(query).Do(context.Background())
  103. if err != nil {
  104. log.Println("从ES查询出错", err.Error())
  105. return nil
  106. }
  107. if searchResult.Hits != nil {
  108. resNum := len(searchResult.Hits.Hits)
  109. if resNum < 5000 {
  110. res = make([]map[string]interface{}, resNum)
  111. for i, hit := range searchResult.Hits.Hits {
  112. parseErr := json.Unmarshal(hit.Source, &res[i])
  113. if parseErr == nil && hit.Highlight != nil && res[i] != nil {
  114. res[i]["highlight"] = map[string][]string(hit.Highlight)
  115. }
  116. }
  117. } else {
  118. log.Println("查询结果太多,查询到:", resNum, "条")
  119. }
  120. }
  121. }
  122. return &res
  123. }
  124. // 关闭elastic
  125. func (e *Elastic) Close() {
  126. for i := 0; i < e.I_size; i++ {
  127. cli := <-e.Pool
  128. cli.Stop()
  129. cli = nil
  130. }
  131. e.Pool = nil
  132. e = nil
  133. }
  134. //获取连接
  135. //func (e *Elastic) GetEsConn() (c *es.Client) {
  136. // defer util.Catch()
  137. // select {
  138. // case c = <-e.Pool:
  139. // if c == nil || !c.IsRunning() {
  140. // client, err := es.NewClient(es.SetURL(addrs...),
  141. // es.SetMaxRetries(2), es.SetSniff(false))
  142. // if err == nil && client.IsRunning() {
  143. // return client
  144. // }
  145. // return nil
  146. // }
  147. // return
  148. // case <-time.After(time.Second * 7):
  149. // //超时
  150. // ntimeout++
  151. // log.Println("timeout times:", ntimeout)
  152. // return nil
  153. // }
  154. //}
  155. func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
  156. client := e.GetEsConn()
  157. defer e.DestoryEsConn(client)
  158. if client != nil {
  159. req := client.Bulk()
  160. for _, v := range obj {
  161. //if isDelBefore {
  162. // req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
  163. //}
  164. id := util.ObjToString(v["_id"])
  165. doc := make(map[string]interface{}, 0)
  166. for k, va := range v {
  167. doc[k] = va
  168. }
  169. delete(doc, "_id")
  170. req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc))
  171. }
  172. _, err := req.Do(context.Background())
  173. if err != nil {
  174. log.Println("批量保存到ES出错", err.Error())
  175. }
  176. }
  177. }
  178. // BulkSaveReturnFails 批量保存 返回保存失败的doc
  179. func (e *Elastic) BulkSaveReturnFails(index string, obj []map[string]interface{}) []map[string]interface{} {
  180. client := e.GetEsConn()
  181. defer e.DestoryEsConn(client)
  182. if client == nil {
  183. return obj
  184. }
  185. // 存储 ID 与原始数据的映射关系
  186. idToData := make(map[string]map[string]interface{})
  187. // 收集失败的文档
  188. var failedDocs []map[string]interface{}
  189. req := client.Bulk()
  190. for _, v := range obj {
  191. id := util.ObjToString(v["_id"])
  192. idToData[id] = v // 建立映射
  193. doc := make(map[string]interface{})
  194. for k, va := range v {
  195. doc[k] = va
  196. }
  197. delete(doc, "_id")
  198. req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc))
  199. }
  200. res, err := req.Do(context.Background())
  201. if err != nil {
  202. log.Println("批量保存到ES出错", err.Error())
  203. return obj
  204. }
  205. if res.Errors {
  206. for _, item := range res.Items {
  207. for _, result := range item {
  208. if result.Error != nil {
  209. // 通过 ID 找到原始数据并添加到失败列表
  210. if originalData, exists := idToData[result.Id]; exists {
  211. // 可以在失败数据中添加错误信息,方便排查
  212. //originalData["_error_reason"] = result.Error.Reason
  213. log.Println("批量保存到ES出错", result.Error.Reason)
  214. failedDocs = append(failedDocs, originalData)
  215. }
  216. }
  217. }
  218. }
  219. // 返回失败的文档列表,无错误(因为请求已被处理)
  220. return failedDocs
  221. }
  222. // 全部成功,返回空列表
  223. return nil
  224. }
  225. // 根据id删除索引对象
  226. func (e *Elastic) DelById(index, id string) bool {
  227. client := e.GetEsConn()
  228. defer e.DestoryEsConn(client)
  229. b := false
  230. if client != nil {
  231. var err error
  232. _, err = client.Delete().Index(index).Id(id).Do(context.Background())
  233. if err != nil {
  234. log.Println("更新检索出错:", err.Error())
  235. } else {
  236. b = true
  237. }
  238. }
  239. return b
  240. }
  241. func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
  242. client := e.GetEsConn()
  243. defer e.DestoryEsConn(client)
  244. var res []map[string]interface{}
  245. if client != nil {
  246. defer func() {
  247. if r := recover(); r != nil {
  248. log.Println("[E]", r)
  249. for skip := 1; ; skip++ {
  250. _, file, line, ok := runtime.Caller(skip)
  251. if !ok {
  252. break
  253. }
  254. go log.Printf("%v,%v\n", file, line)
  255. }
  256. }
  257. }()
  258. searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
  259. if err != nil {
  260. log.Println("从ES查询出错", err.Error())
  261. return nil
  262. }
  263. if searchResult.Hits != nil {
  264. resNum := len(searchResult.Hits.Hits)
  265. res = make([]map[string]interface{}, resNum)
  266. for i, hit := range searchResult.Hits.Hits {
  267. json.Unmarshal(hit.Source, &res[i])
  268. }
  269. }
  270. }
  271. return &res
  272. }
  273. //func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
  274. // client := e.GetEsConn()
  275. // defer e.DestoryEsConn(client)
  276. // if client != nil {
  277. // defer func() {
  278. // if r := recover(); r != nil {
  279. // log.Println("[E]", r)
  280. // for skip := 1; ; skip++ {
  281. // _, file, line, ok := runtime.Caller(skip)
  282. // if !ok {
  283. // break
  284. // }
  285. // go log.Printf("%v,%v\n", file, line)
  286. // }
  287. // }
  288. // }()
  289. // query := `{"query":{"term":{"_id":"` + id + `"}}`
  290. // if len(fields) > 0 {
  291. // query = query + `,"_source":[` + fields + `]`
  292. // }
  293. // query = query + "}"
  294. // searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
  295. // if err != nil {
  296. // log.Println("从ES查询出错", err.Error())
  297. // return nil
  298. // }
  299. // var res map[string]interface{}
  300. // if searchResult.Hits != nil {
  301. // resNum := len(searchResult.Hits.Hits)
  302. // if resNum == 1 {
  303. // res = make(map[string]interface{})
  304. // for _, hit := range searchResult.Hits.Hits {
  305. // json.Unmarshal(*hit.Source., &res)
  306. // }
  307. // return &res
  308. // }
  309. // }
  310. // }
  311. // return nil
  312. //}
  313. func (e *Elastic) Count(index string, query interface{}) int64 {
  314. client := e.GetEsConn()
  315. defer e.DestoryEsConn(client)
  316. if client != nil {
  317. defer func() {
  318. if r := recover(); r != nil {
  319. log.Println("[E]", r)
  320. for skip := 1; ; skip++ {
  321. _, file, line, ok := runtime.Caller(skip)
  322. if !ok {
  323. break
  324. }
  325. go log.Printf("%v,%v\n", file, line)
  326. }
  327. }
  328. }()
  329. var qq es.Query
  330. if qi, ok2 := query.(es.Query); ok2 {
  331. qq = qi
  332. }
  333. n, err := client.Count(index).Query(qq).Do(context.Background())
  334. if err != nil {
  335. log.Println("统计出错", err.Error())
  336. }
  337. return n
  338. }
  339. return 0
  340. }
  341. //更新一个字段
  342. //func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
  343. // client := e.GetEsConn()
  344. // defer e.DestoryEsConn(client)
  345. // if client != nil {
  346. // defer func() {
  347. // if r := recover(); r != nil {
  348. // log.Println("[E]", r)
  349. // for skip := 1; ; skip++ {
  350. // _, file, line, ok := runtime.Caller(skip)
  351. // if !ok {
  352. // break
  353. // }
  354. // go log.Printf("%v,%v\n", file, line)
  355. // }
  356. // }
  357. // }()
  358. // for _, data := range update {
  359. // id := data["id"]
  360. // updateStr := data["updateStr"]
  361. // if id != "" && updateStr != "" {
  362. // _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
  363. // if err != nil {
  364. // log.Println("更新检索出错:", err.Error())
  365. // }
  366. // } else {
  367. // log.Println("数据错误")
  368. // }
  369. // }
  370. // }
  371. //}
  372. //更新多个字段
  373. //func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
  374. // client := e.GetEsConn()
  375. // defer e.DestoryEsConn(client)
  376. // if client != nil {
  377. // defer func() {
  378. // if r := recover(); r != nil {
  379. // log.Println("[E]", r)
  380. // for skip := 1; ; skip++ {
  381. // _, file, line, ok := runtime.Caller(skip)
  382. // if !ok {
  383. // break
  384. // }
  385. // go log.Printf("%v,%v\n", file, line)
  386. // }
  387. // }
  388. // }()
  389. // for _, arr := range arrs {
  390. // id := arr[0]["id"].(string)
  391. // update := arr[1]["update"].([]string)
  392. // for _, str := range update {
  393. // _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
  394. // if err != nil {
  395. // log.Println("更新检索出错:", err.Error())
  396. // }
  397. // }
  398. // }
  399. // }
  400. //}
  401. // UpdateBulk 批量修改文档
  402. func (e *Elastic) UpdateBulk(index string, docs ...[]map[string]interface{}) {
  403. client := e.GetEsConn()
  404. defer e.DestoryEsConn(client)
  405. bulkService := client.Bulk().Index(index).Refresh("true")
  406. //bulkService.Type(itype)
  407. for _, d := range docs {
  408. id := d[0]["_id"].(string)
  409. doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
  410. bulkService.Add(doc)
  411. }
  412. _, err := bulkService.Do(context.Background())
  413. if err != nil {
  414. fmt.Printf("UpdateBulk all success err is %v\n", err)
  415. }
  416. //if len(res.Failed()) > 0 {
  417. // fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
  418. //}
  419. }
  420. // UpsertBulk 批量修改文档(不存在则插入)
  421. func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
  422. client := e.GetEsConn()
  423. defer e.DestoryEsConn(client)
  424. bulkService := client.Bulk().Index(index).Refresh("true")
  425. //bulkService.Type("bidding")
  426. for i := range ids {
  427. doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
  428. bulkService.Add(doc)
  429. }
  430. res, err := bulkService.Do(context.Background())
  431. if err != nil {
  432. return err
  433. }
  434. if len(res.Failed()) > 0 {
  435. return errors.New(res.Failed()[0].Error.Reason)
  436. }
  437. return nil
  438. }
  439. // 批量删除
  440. func (e *Elastic) DeleteBulk(index string, ids []string) {
  441. client := e.GetEsConn()
  442. defer e.DestoryEsConn(client)
  443. bulkService := client.Bulk().Index(index).Refresh("true")
  444. //bulkService.Type("bidding")
  445. for i := range ids {
  446. req := es.NewBulkDeleteRequest().Id(ids[i])
  447. bulkService.Add(req)
  448. }
  449. res, err := bulkService.Do(context.Background())
  450. if err != nil {
  451. fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
  452. }
  453. }
  454. // InsertOrUpdate 插入或更新
  455. func (e *Elastic) InsertOrUpdate(index string, docs []map[string]interface{}) error {
  456. client := e.GetEsConn()
  457. defer e.DestoryEsConn(client)
  458. for _, item := range docs {
  459. // 获取唯一标识符
  460. id := item["id"].(string)
  461. if id == "" {
  462. id = item["_id"].(string)
  463. }
  464. // 根据id判断记录是否存在
  465. exists, err := client.Exists().
  466. Index(index).
  467. Id(id).
  468. Do(context.Background())
  469. if err != nil {
  470. return err
  471. }
  472. // 存在则更新,不存在则插入
  473. if exists {
  474. _, err := client.Update().
  475. Index(index).
  476. Id(id).
  477. Doc(item).
  478. Do(context.Background())
  479. if err != nil {
  480. return err
  481. }
  482. } else {
  483. _, err := client.Index().
  484. Index(index).
  485. Id(id).
  486. BodyJson(item).
  487. Do(context.Background())
  488. if err != nil {
  489. return err
  490. }
  491. }
  492. }
  493. return nil
  494. }
  495. // ExistsIndex 判断索引是否存在
  496. func (e *Elastic) ExistsIndex(index string) (exists bool, err error) {
  497. client := e.GetEsConn()
  498. defer e.DestoryEsConn(client)
  499. exists, err = client.IndexExists(index).Do(context.Background())
  500. return
  501. }
  502. // CreateIndex 创建索引
  503. func (e *Elastic) CreateIndex(index string, mapping string) (err error) {
  504. client := e.GetEsConn()
  505. defer e.DestoryEsConn(client)
  506. _, err = client.CreateIndex(index).BodyString(mapping).Do(context.Background())
  507. return
  508. }
  509. // DeleteIndex 删除索引
  510. func (e *Elastic) DeleteIndex(index string) (err error) {
  511. client := e.GetEsConn()
  512. defer e.DestoryEsConn(client)
  513. _, err = client.DeleteIndex(index).Do(context.Background())
  514. return
  515. }
  516. // RemoveAlias 移除别名
  517. func (e *Elastic) RemoveAlias(index, aliasName string) (err error) {
  518. client := e.GetEsConn()
  519. defer e.DestoryEsConn(client)
  520. _, err = client.Alias().Remove(index, aliasName).Do(context.Background())
  521. return
  522. }
  523. // SetAlias 添加别名
  524. func (e *Elastic) SetAlias(index, aliasName string) (err error) {
  525. client := e.GetEsConn()
  526. defer e.DestoryEsConn(client)
  527. _, err = client.Alias().Add(index, aliasName).Do(context.Background())
  528. return
  529. }
  530. // DeleteByID 根据ID 删除索引数据;id 或者索引名称不存在,都不会报错
  531. func (e *Elastic) DeleteByID(index, id string) error {
  532. client := e.GetEsConn()
  533. defer e.DestoryEsConn(client)
  534. _, err := client.Delete().Index(index).Id(id).Do(context.Background())
  535. if err != nil && es.IsNotFound(err) {
  536. return nil
  537. }
  538. return err
  539. }
  540. // UpdateDocument 更新指定ID的文档
  541. func (e *Elastic) UpdateDocument(indexName string, documentID string, updateData map[string]interface{}) error {
  542. client := e.GetEsConn()
  543. defer e.DestoryEsConn(client)
  544. updateResult, err := client.Update().
  545. Index(indexName).
  546. Id(documentID).
  547. Doc(updateData).
  548. Do(context.Background())
  549. if err != nil {
  550. return err
  551. }
  552. if updateResult.Result != "updated" {
  553. return fmt.Errorf("Document not updated: %v", updateResult.Result)
  554. }
  555. return nil
  556. }
  557. // Save 保存对象
  558. func (e *Elastic) Save(index string, obj interface{}) bool {
  559. client := e.GetEsConn()
  560. defer e.DestoryEsConn(client)
  561. defer func() {
  562. if r := recover(); r != nil {
  563. log.Println("[E]", r)
  564. for skip := 1; ; skip++ {
  565. _, file, line, ok := runtime.Caller(skip)
  566. if !ok {
  567. break
  568. }
  569. go log.Printf("%v,%v\n", file, line)
  570. }
  571. }
  572. }()
  573. data := util.ObjToMap(obj)
  574. _id := mongodb.BsonIdToSId((*data)["_id"])
  575. (*data)["id"] = _id
  576. delete((*data), "_id")
  577. _, err := client.Index().Index(index).Id(_id).BodyJson(data).Do(context.TODO())
  578. if err != nil {
  579. log.Println("保存到ES出错", err.Error(), obj)
  580. return false
  581. } else {
  582. return true
  583. }
  584. }
  585. // SaveDocument 保存单个索引
  586. func (e *Elastic) SaveDocument(index string, data map[string]interface{}) error {
  587. client := e.GetEsConn()
  588. defer e.DestoryEsConn(client)
  589. // 从数据中获取 ID 字段
  590. id, ok := data["id"].(string)
  591. if !ok {
  592. return errors.New("ID field not found in data")
  593. }
  594. // 将数据转换为 JSON 格式
  595. docJSON, err := json.Marshal(data)
  596. if err != nil {
  597. return err
  598. }
  599. // 创建一个新的索引请求
  600. indexRequest := client.Index().
  601. Index(index).
  602. Id(id).
  603. BodyJson(string(docJSON))
  604. // 执行索引请求
  605. _, err = indexRequest.Do(context.Background())
  606. if err != nil {
  607. return err
  608. }
  609. return nil
  610. }
  611. // GetById 获取单个索引文档
  612. func (e *Elastic) GetById(index string, id string) (err error, data map[string]interface{}) {
  613. client := e.GetEsConn()
  614. defer e.DestoryEsConn(client)
  615. ctx := context.Background()
  616. // 使用Get方法获取文档
  617. getResponse, err := client.Get().
  618. Index(index).
  619. Id(id).
  620. Do(ctx)
  621. if err != nil {
  622. // 处理错误
  623. return err, nil
  624. }
  625. if !getResponse.Found {
  626. // 文档存在,返回文档数据
  627. return errors.New("Document not found"), nil // 文档不存在,返回错误
  628. }
  629. if err := json.Unmarshal(getResponse.Source, &data); err != nil {
  630. // 处理解码错误
  631. return err, nil
  632. }
  633. return nil, data
  634. }