elasticSim.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. package elastic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "runtime"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. type Elastic struct {
  17. S_esurl string
  18. I_size int
  19. Addrs []string
  20. Pool chan *es.Client
  21. lastTime int64
  22. lastTimeLock sync.Mutex
  23. ntimeout int
  24. Username string
  25. Password string
  26. }
  27. func (e *Elastic) InitElasticSize() {
  28. e.Pool = make(chan *es.Client, e.I_size)
  29. for _, s := range strings.Split(e.S_esurl, ",") {
  30. e.Addrs = append(e.Addrs, s)
  31. }
  32. for i := 0; i < e.I_size; i++ {
  33. client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
  34. e.Pool <- client
  35. }
  36. }
  37. // 关闭连接
  38. func (e *Elastic) DestoryEsConn(client *es.Client) {
  39. select {
  40. case e.Pool <- client:
  41. break
  42. case <-time.After(time.Second * 1):
  43. if client != nil {
  44. client.Stop()
  45. }
  46. client = nil
  47. }
  48. }
  49. func (e *Elastic) GetEsConn() *es.Client {
  50. select {
  51. case c := <-e.Pool:
  52. if c == nil || !c.IsRunning() {
  53. log.Println("new esclient.", len(e.Pool))
  54. client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
  55. es.SetMaxRetries(2), es.SetSniff(false))
  56. if err == nil && client.IsRunning() {
  57. return client
  58. }
  59. }
  60. return c
  61. case <-time.After(time.Second * 4):
  62. //超时
  63. e.ntimeout++
  64. e.lastTimeLock.Lock()
  65. defer e.lastTimeLock.Unlock()
  66. //12秒后允许创建链接
  67. c := time.Now().Unix() - e.lastTime
  68. if c > 12 {
  69. e.lastTime = time.Now().Unix()
  70. log.Println("add client..", len(e.Pool))
  71. c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
  72. go func() {
  73. for i := 0; i < 2; i++ {
  74. client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
  75. e.Pool <- client
  76. }
  77. }()
  78. return c
  79. }
  80. return nil
  81. }
  82. }
  83. func (e *Elastic) Get(index string, query es.Query) *[]map[string]interface{} {
  84. client := e.GetEsConn()
  85. defer func() {
  86. go e.DestoryEsConn(client)
  87. }()
  88. var res []map[string]interface{}
  89. if client != nil {
  90. defer func() {
  91. if r := recover(); r != nil {
  92. log.Println("[E]", r)
  93. for skip := 1; ; skip++ {
  94. _, file, line, ok := runtime.Caller(skip)
  95. if !ok {
  96. break
  97. }
  98. go log.Printf("%v,%v\n", file, line)
  99. }
  100. }
  101. }()
  102. searchResult, err := client.Search().Index(index).Query(query).Do(context.Background())
  103. if err != nil {
  104. log.Println("从ES查询出错", err.Error())
  105. return nil
  106. }
  107. if searchResult.Hits != nil {
  108. resNum := len(searchResult.Hits.Hits)
  109. if resNum < 5000 {
  110. res = make([]map[string]interface{}, resNum)
  111. for i, hit := range searchResult.Hits.Hits {
  112. parseErr := json.Unmarshal(hit.Source, &res[i])
  113. if parseErr == nil && hit.Highlight != nil && res[i] != nil {
  114. res[i]["highlight"] = map[string][]string(hit.Highlight)
  115. }
  116. }
  117. } else {
  118. log.Println("查询结果太多,查询到:", resNum, "条")
  119. }
  120. }
  121. }
  122. return &res
  123. }
  124. // 关闭elastic
  125. func (e *Elastic) Close() {
  126. for i := 0; i < e.I_size; i++ {
  127. cli := <-e.Pool
  128. cli.Stop()
  129. cli = nil
  130. }
  131. e.Pool = nil
  132. e = nil
  133. }
  134. //获取连接
  135. //func (e *Elastic) GetEsConn() (c *es.Client) {
  136. // defer util.Catch()
  137. // select {
  138. // case c = <-e.Pool:
  139. // if c == nil || !c.IsRunning() {
  140. // client, err := es.NewClient(es.SetURL(addrs...),
  141. // es.SetMaxRetries(2), es.SetSniff(false))
  142. // if err == nil && client.IsRunning() {
  143. // return client
  144. // }
  145. // return nil
  146. // }
  147. // return
  148. // case <-time.After(time.Second * 7):
  149. // //超时
  150. // ntimeout++
  151. // log.Println("timeout times:", ntimeout)
  152. // return nil
  153. // }
  154. //}
  155. func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
  156. client := e.GetEsConn()
  157. defer e.DestoryEsConn(client)
  158. if client != nil {
  159. req := client.Bulk()
  160. for _, v := range obj {
  161. //if isDelBefore {
  162. // req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
  163. //}
  164. id := util.ObjToString(v["_id"])
  165. doc := make(map[string]interface{}, 0)
  166. for k, va := range v {
  167. doc[k] = va
  168. }
  169. delete(doc, "_id")
  170. req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc))
  171. }
  172. _, err := req.Do(context.Background())
  173. if err != nil {
  174. log.Println("批量保存到ES出错", err.Error())
  175. }
  176. }
  177. }
  178. // BulkSaveReturnFails 批量保存 返回保存失败的doc
  179. func (e *Elastic) BulkSaveReturnFails(index string, obj []map[string]interface{}) []map[string]interface{} {
  180. client := e.GetEsConn()
  181. defer e.DestoryEsConn(client)
  182. if client == nil {
  183. return obj
  184. }
  185. // 存储 ID 与原始数据的映射关系
  186. idToData := make(map[string]map[string]interface{})
  187. // 收集失败的文档
  188. var failedDocs []map[string]interface{}
  189. req := client.Bulk()
  190. for _, v := range obj {
  191. id := util.ObjToString(v["_id"])
  192. idToData[id] = v // 建立映射
  193. doc := make(map[string]interface{})
  194. for k, va := range v {
  195. doc[k] = va
  196. }
  197. delete(doc, "_id")
  198. req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc))
  199. }
  200. res, err := req.Do(context.Background())
  201. if err != nil {
  202. log.Println("批量保存到ES出错", err.Error())
  203. return obj
  204. }
  205. if res.Errors {
  206. for _, item := range res.Items {
  207. for _, result := range item {
  208. if result.Error != nil {
  209. // 通过 ID 找到原始数据并添加到失败列表
  210. if originalData, exists := idToData[result.Id]; exists {
  211. // 可以在失败数据中添加错误信息,方便排查
  212. originalData["_error_reason"] = result.Error.Reason
  213. failedDocs = append(failedDocs, originalData)
  214. }
  215. }
  216. }
  217. }
  218. // 返回失败的文档列表,无错误(因为请求已被处理)
  219. return failedDocs
  220. }
  221. // 全部成功,返回空列表
  222. return nil
  223. }
  224. // 根据id删除索引对象
  225. func (e *Elastic) DelById(index, id string) bool {
  226. client := e.GetEsConn()
  227. defer e.DestoryEsConn(client)
  228. b := false
  229. if client != nil {
  230. var err error
  231. _, err = client.Delete().Index(index).Id(id).Do(context.Background())
  232. if err != nil {
  233. log.Println("更新检索出错:", err.Error())
  234. } else {
  235. b = true
  236. }
  237. }
  238. return b
  239. }
  240. func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
  241. client := e.GetEsConn()
  242. defer e.DestoryEsConn(client)
  243. var res []map[string]interface{}
  244. if client != nil {
  245. defer func() {
  246. if r := recover(); r != nil {
  247. log.Println("[E]", r)
  248. for skip := 1; ; skip++ {
  249. _, file, line, ok := runtime.Caller(skip)
  250. if !ok {
  251. break
  252. }
  253. go log.Printf("%v,%v\n", file, line)
  254. }
  255. }
  256. }()
  257. searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
  258. if err != nil {
  259. log.Println("从ES查询出错", err.Error())
  260. return nil
  261. }
  262. if searchResult.Hits != nil {
  263. resNum := len(searchResult.Hits.Hits)
  264. res = make([]map[string]interface{}, resNum)
  265. for i, hit := range searchResult.Hits.Hits {
  266. json.Unmarshal(hit.Source, &res[i])
  267. }
  268. }
  269. }
  270. return &res
  271. }
  272. //func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
  273. // client := e.GetEsConn()
  274. // defer e.DestoryEsConn(client)
  275. // if client != nil {
  276. // defer func() {
  277. // if r := recover(); r != nil {
  278. // log.Println("[E]", r)
  279. // for skip := 1; ; skip++ {
  280. // _, file, line, ok := runtime.Caller(skip)
  281. // if !ok {
  282. // break
  283. // }
  284. // go log.Printf("%v,%v\n", file, line)
  285. // }
  286. // }
  287. // }()
  288. // query := `{"query":{"term":{"_id":"` + id + `"}}`
  289. // if len(fields) > 0 {
  290. // query = query + `,"_source":[` + fields + `]`
  291. // }
  292. // query = query + "}"
  293. // searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
  294. // if err != nil {
  295. // log.Println("从ES查询出错", err.Error())
  296. // return nil
  297. // }
  298. // var res map[string]interface{}
  299. // if searchResult.Hits != nil {
  300. // resNum := len(searchResult.Hits.Hits)
  301. // if resNum == 1 {
  302. // res = make(map[string]interface{})
  303. // for _, hit := range searchResult.Hits.Hits {
  304. // json.Unmarshal(*hit.Source., &res)
  305. // }
  306. // return &res
  307. // }
  308. // }
  309. // }
  310. // return nil
  311. //}
  312. func (e *Elastic) Count(index string, query interface{}) int64 {
  313. client := e.GetEsConn()
  314. defer e.DestoryEsConn(client)
  315. if client != nil {
  316. defer func() {
  317. if r := recover(); r != nil {
  318. log.Println("[E]", r)
  319. for skip := 1; ; skip++ {
  320. _, file, line, ok := runtime.Caller(skip)
  321. if !ok {
  322. break
  323. }
  324. go log.Printf("%v,%v\n", file, line)
  325. }
  326. }
  327. }()
  328. var qq es.Query
  329. if qi, ok2 := query.(es.Query); ok2 {
  330. qq = qi
  331. }
  332. n, err := client.Count(index).Query(qq).Do(context.Background())
  333. if err != nil {
  334. log.Println("统计出错", err.Error())
  335. }
  336. return n
  337. }
  338. return 0
  339. }
  340. //更新一个字段
  341. //func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
  342. // client := e.GetEsConn()
  343. // defer e.DestoryEsConn(client)
  344. // if client != nil {
  345. // defer func() {
  346. // if r := recover(); r != nil {
  347. // log.Println("[E]", r)
  348. // for skip := 1; ; skip++ {
  349. // _, file, line, ok := runtime.Caller(skip)
  350. // if !ok {
  351. // break
  352. // }
  353. // go log.Printf("%v,%v\n", file, line)
  354. // }
  355. // }
  356. // }()
  357. // for _, data := range update {
  358. // id := data["id"]
  359. // updateStr := data["updateStr"]
  360. // if id != "" && updateStr != "" {
  361. // _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
  362. // if err != nil {
  363. // log.Println("更新检索出错:", err.Error())
  364. // }
  365. // } else {
  366. // log.Println("数据错误")
  367. // }
  368. // }
  369. // }
  370. //}
  371. //更新多个字段
  372. //func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
  373. // client := e.GetEsConn()
  374. // defer e.DestoryEsConn(client)
  375. // if client != nil {
  376. // defer func() {
  377. // if r := recover(); r != nil {
  378. // log.Println("[E]", r)
  379. // for skip := 1; ; skip++ {
  380. // _, file, line, ok := runtime.Caller(skip)
  381. // if !ok {
  382. // break
  383. // }
  384. // go log.Printf("%v,%v\n", file, line)
  385. // }
  386. // }
  387. // }()
  388. // for _, arr := range arrs {
  389. // id := arr[0]["id"].(string)
  390. // update := arr[1]["update"].([]string)
  391. // for _, str := range update {
  392. // _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
  393. // if err != nil {
  394. // log.Println("更新检索出错:", err.Error())
  395. // }
  396. // }
  397. // }
  398. // }
  399. //}
  400. // UpdateBulk 批量修改文档
  401. func (e *Elastic) UpdateBulk(index string, docs ...[]map[string]interface{}) {
  402. client := e.GetEsConn()
  403. defer e.DestoryEsConn(client)
  404. bulkService := client.Bulk().Index(index).Refresh("true")
  405. //bulkService.Type(itype)
  406. for _, d := range docs {
  407. id := d[0]["_id"].(string)
  408. doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
  409. bulkService.Add(doc)
  410. }
  411. _, err := bulkService.Do(context.Background())
  412. if err != nil {
  413. fmt.Printf("UpdateBulk all success err is %v\n", err)
  414. }
  415. //if len(res.Failed()) > 0 {
  416. // fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
  417. //}
  418. }
  419. // UpsertBulk 批量修改文档(不存在则插入)
  420. func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
  421. client := e.GetEsConn()
  422. defer e.DestoryEsConn(client)
  423. bulkService := client.Bulk().Index(index).Refresh("true")
  424. //bulkService.Type("bidding")
  425. for i := range ids {
  426. doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
  427. bulkService.Add(doc)
  428. }
  429. res, err := bulkService.Do(context.Background())
  430. if err != nil {
  431. return err
  432. }
  433. if len(res.Failed()) > 0 {
  434. return errors.New(res.Failed()[0].Error.Reason)
  435. }
  436. return nil
  437. }
  438. // 批量删除
  439. func (e *Elastic) DeleteBulk(index string, ids []string) {
  440. client := e.GetEsConn()
  441. defer e.DestoryEsConn(client)
  442. bulkService := client.Bulk().Index(index).Refresh("true")
  443. //bulkService.Type("bidding")
  444. for i := range ids {
  445. req := es.NewBulkDeleteRequest().Id(ids[i])
  446. bulkService.Add(req)
  447. }
  448. res, err := bulkService.Do(context.Background())
  449. if err != nil {
  450. fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
  451. }
  452. }
  453. // InsertOrUpdate 插入或更新
  454. func (e *Elastic) InsertOrUpdate(index string, docs []map[string]interface{}) error {
  455. client := e.GetEsConn()
  456. defer e.DestoryEsConn(client)
  457. for _, item := range docs {
  458. // 获取唯一标识符
  459. id := item["id"].(string)
  460. if id == "" {
  461. id = item["_id"].(string)
  462. }
  463. // 根据id判断记录是否存在
  464. exists, err := client.Exists().
  465. Index(index).
  466. Id(id).
  467. Do(context.Background())
  468. if err != nil {
  469. return err
  470. }
  471. // 存在则更新,不存在则插入
  472. if exists {
  473. _, err := client.Update().
  474. Index(index).
  475. Id(id).
  476. Doc(item).
  477. Do(context.Background())
  478. if err != nil {
  479. return err
  480. }
  481. } else {
  482. _, err := client.Index().
  483. Index(index).
  484. Id(id).
  485. BodyJson(item).
  486. Do(context.Background())
  487. if err != nil {
  488. return err
  489. }
  490. }
  491. }
  492. return nil
  493. }
  494. // ExistsIndex 判断索引是否存在
  495. func (e *Elastic) ExistsIndex(index string) (exists bool, err error) {
  496. client := e.GetEsConn()
  497. defer e.DestoryEsConn(client)
  498. exists, err = client.IndexExists(index).Do(context.Background())
  499. return
  500. }
  501. // CreateIndex 创建索引
  502. func (e *Elastic) CreateIndex(index string, mapping string) (err error) {
  503. client := e.GetEsConn()
  504. defer e.DestoryEsConn(client)
  505. _, err = client.CreateIndex(index).BodyString(mapping).Do(context.Background())
  506. return
  507. }
  508. // DeleteIndex 删除索引
  509. func (e *Elastic) DeleteIndex(index string) (err error) {
  510. client := e.GetEsConn()
  511. defer e.DestoryEsConn(client)
  512. _, err = client.DeleteIndex(index).Do(context.Background())
  513. return
  514. }
  515. // RemoveAlias 移除别名
  516. func (e *Elastic) RemoveAlias(index, aliasName string) (err error) {
  517. client := e.GetEsConn()
  518. defer e.DestoryEsConn(client)
  519. _, err = client.Alias().Remove(index, aliasName).Do(context.Background())
  520. return
  521. }
  522. // SetAlias 添加别名
  523. func (e *Elastic) SetAlias(index, aliasName string) (err error) {
  524. client := e.GetEsConn()
  525. defer e.DestoryEsConn(client)
  526. _, err = client.Alias().Add(index, aliasName).Do(context.Background())
  527. return
  528. }
  529. // DeleteByID 根据ID 删除索引数据;id 或者索引名称不存在,都不会报错
  530. func (e *Elastic) DeleteByID(index, id string) error {
  531. client := e.GetEsConn()
  532. defer e.DestoryEsConn(client)
  533. _, err := client.Delete().Index(index).Id(id).Do(context.Background())
  534. if err != nil && es.IsNotFound(err) {
  535. return nil
  536. }
  537. return err
  538. }
  539. // UpdateDocument 更新指定ID的文档
  540. func (e *Elastic) UpdateDocument(indexName string, documentID string, updateData map[string]interface{}) error {
  541. client := e.GetEsConn()
  542. defer e.DestoryEsConn(client)
  543. updateResult, err := client.Update().
  544. Index(indexName).
  545. Id(documentID).
  546. Doc(updateData).
  547. Do(context.Background())
  548. if err != nil {
  549. return err
  550. }
  551. if updateResult.Result != "updated" {
  552. return fmt.Errorf("Document not updated: %v", updateResult.Result)
  553. }
  554. return nil
  555. }
  556. // Save 保存对象
  557. func (e *Elastic) Save(index string, obj interface{}) bool {
  558. client := e.GetEsConn()
  559. defer e.DestoryEsConn(client)
  560. defer func() {
  561. if r := recover(); r != nil {
  562. log.Println("[E]", r)
  563. for skip := 1; ; skip++ {
  564. _, file, line, ok := runtime.Caller(skip)
  565. if !ok {
  566. break
  567. }
  568. go log.Printf("%v,%v\n", file, line)
  569. }
  570. }
  571. }()
  572. data := util.ObjToMap(obj)
  573. _id := mongodb.BsonIdToSId((*data)["_id"])
  574. (*data)["id"] = _id
  575. delete((*data), "_id")
  576. _, err := client.Index().Index(index).Id(_id).BodyJson(data).Do(context.TODO())
  577. if err != nil {
  578. log.Println("保存到ES出错", err.Error(), obj)
  579. return false
  580. } else {
  581. return true
  582. }
  583. }
  584. // SaveDocument 保存单个索引
  585. func (e *Elastic) SaveDocument(index string, data map[string]interface{}) error {
  586. client := e.GetEsConn()
  587. defer e.DestoryEsConn(client)
  588. // 从数据中获取 ID 字段
  589. id, ok := data["id"].(string)
  590. if !ok {
  591. return errors.New("ID field not found in data")
  592. }
  593. // 将数据转换为 JSON 格式
  594. docJSON, err := json.Marshal(data)
  595. if err != nil {
  596. return err
  597. }
  598. // 创建一个新的索引请求
  599. indexRequest := client.Index().
  600. Index(index).
  601. Id(id).
  602. BodyJson(string(docJSON))
  603. // 执行索引请求
  604. _, err = indexRequest.Do(context.Background())
  605. if err != nil {
  606. return err
  607. }
  608. return nil
  609. }
  610. // GetById 获取单个索引文档
  611. func (e *Elastic) GetById(index string, id string) (err error, data map[string]interface{}) {
  612. client := e.GetEsConn()
  613. defer e.DestoryEsConn(client)
  614. ctx := context.Background()
  615. // 使用Get方法获取文档
  616. getResponse, err := client.Get().
  617. Index(index).
  618. Id(id).
  619. Do(ctx)
  620. if err != nil {
  621. // 处理错误
  622. return err, nil
  623. }
  624. if !getResponse.Found {
  625. // 文档存在,返回文档数据
  626. return errors.New("Document not found"), nil // 文档不存在,返回错误
  627. }
  628. if err := json.Unmarshal(getResponse.Source, &data); err != nil {
  629. // 处理解码错误
  630. return err, nil
  631. }
  632. return nil, data
  633. }