elasticSim.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. package elastic
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. es "github.com/olivere/elastic/v7"
  8. util "jygit.jydev.jianyu360.cn/data_processing/common_utils"
  9. "jygit.jydev.jianyu360.cn/data_processing/common_utils/mongodb"
  10. "log"
  11. "runtime"
  12. "strings"
  13. "sync"
  14. "time"
  15. )
  16. type Elastic struct {
  17. S_esurl string
  18. I_size int
  19. Addrs []string
  20. Pool chan *es.Client
  21. lastTime int64
  22. lastTimeLock sync.Mutex
  23. ntimeout int
  24. Username string
  25. Password string
  26. }
  27. func (e *Elastic) InitElasticSize() {
  28. e.Pool = make(chan *es.Client, e.I_size)
  29. for _, s := range strings.Split(e.S_esurl, ",") {
  30. e.Addrs = append(e.Addrs, s)
  31. }
  32. for i := 0; i < e.I_size; i++ {
  33. client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
  34. e.Pool <- client
  35. }
  36. }
  37. // 关闭连接
  38. func (e *Elastic) DestoryEsConn(client *es.Client) {
  39. select {
  40. case e.Pool <- client:
  41. break
  42. case <-time.After(time.Second * 1):
  43. if client != nil {
  44. client.Stop()
  45. }
  46. client = nil
  47. }
  48. }
  49. func (e *Elastic) GetEsConn() *es.Client {
  50. select {
  51. case c := <-e.Pool:
  52. if c == nil || !c.IsRunning() {
  53. log.Println("new esclient.", len(e.Pool))
  54. client, err := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password),
  55. es.SetMaxRetries(2), es.SetSniff(false))
  56. if err == nil && client.IsRunning() {
  57. return client
  58. }
  59. }
  60. return c
  61. case <-time.After(time.Second * 4):
  62. //超时
  63. e.ntimeout++
  64. e.lastTimeLock.Lock()
  65. defer e.lastTimeLock.Unlock()
  66. //12秒后允许创建链接
  67. c := time.Now().Unix() - e.lastTime
  68. if c > 12 {
  69. e.lastTime = time.Now().Unix()
  70. log.Println("add client..", len(e.Pool))
  71. c, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
  72. go func() {
  73. for i := 0; i < 2; i++ {
  74. client, _ := es.NewClient(es.SetURL(e.Addrs...), es.SetBasicAuth(e.Username, e.Password), es.SetMaxRetries(2), es.SetSniff(false))
  75. e.Pool <- client
  76. }
  77. }()
  78. return c
  79. }
  80. return nil
  81. }
  82. }
  83. func (e *Elastic) Get(index string, query es.Query) *[]map[string]interface{} {
  84. client := e.GetEsConn()
  85. defer func() {
  86. go e.DestoryEsConn(client)
  87. }()
  88. var res []map[string]interface{}
  89. if client != nil {
  90. defer func() {
  91. if r := recover(); r != nil {
  92. log.Println("[E]", r)
  93. for skip := 1; ; skip++ {
  94. _, file, line, ok := runtime.Caller(skip)
  95. if !ok {
  96. break
  97. }
  98. go log.Printf("%v,%v\n", file, line)
  99. }
  100. }
  101. }()
  102. searchResult, err := client.Search().Index(index).Query(query).Do(context.Background())
  103. if err != nil {
  104. log.Println("从ES查询出错", err.Error())
  105. return nil
  106. }
  107. if searchResult.Hits != nil {
  108. resNum := len(searchResult.Hits.Hits)
  109. if resNum < 5000 {
  110. res = make([]map[string]interface{}, resNum)
  111. for i, hit := range searchResult.Hits.Hits {
  112. parseErr := json.Unmarshal(hit.Source, &res[i])
  113. if parseErr == nil && hit.Highlight != nil && res[i] != nil {
  114. res[i]["highlight"] = map[string][]string(hit.Highlight)
  115. }
  116. }
  117. } else {
  118. log.Println("查询结果太多,查询到:", resNum, "条")
  119. }
  120. }
  121. }
  122. return &res
  123. }
  124. // 关闭elastic
  125. func (e *Elastic) Close() {
  126. for i := 0; i < e.I_size; i++ {
  127. cli := <-e.Pool
  128. cli.Stop()
  129. cli = nil
  130. }
  131. e.Pool = nil
  132. e = nil
  133. }
  134. //获取连接
  135. //func (e *Elastic) GetEsConn() (c *es.Client) {
  136. // defer util.Catch()
  137. // select {
  138. // case c = <-e.Pool:
  139. // if c == nil || !c.IsRunning() {
  140. // client, err := es.NewClient(es.SetURL(addrs...),
  141. // es.SetMaxRetries(2), es.SetSniff(false))
  142. // if err == nil && client.IsRunning() {
  143. // return client
  144. // }
  145. // return nil
  146. // }
  147. // return
  148. // case <-time.After(time.Second * 7):
  149. // //超时
  150. // ntimeout++
  151. // log.Println("timeout times:", ntimeout)
  152. // return nil
  153. // }
  154. //}
  155. func (e *Elastic) BulkSave(index string, obj []map[string]interface{}) {
  156. client := e.GetEsConn()
  157. defer e.DestoryEsConn(client)
  158. if client != nil {
  159. req := client.Bulk()
  160. for _, v := range obj {
  161. //if isDelBefore {
  162. // req = req.Add(es.NewBulkDeleteRequest().Index(index).Id(fmt.Sprintf("%v", v["_id"])))
  163. //}
  164. id := util.ObjToString(v["_id"])
  165. doc := make(map[string]interface{}, 0)
  166. for k, va := range v {
  167. doc[k] = va
  168. }
  169. delete(doc, "_id")
  170. req = req.Add(es.NewBulkIndexRequest().Index(index).Id(id).Doc(doc))
  171. }
  172. _, err := req.Do(context.Background())
  173. if err != nil {
  174. log.Println("批量保存到ES出错", err.Error())
  175. }
  176. }
  177. }
  178. // 根据id删除索引对象
  179. func (e *Elastic) DelById(index, id string) bool {
  180. client := e.GetEsConn()
  181. defer e.DestoryEsConn(client)
  182. b := false
  183. if client != nil {
  184. var err error
  185. _, err = client.Delete().Index(index).Id(id).Do(context.Background())
  186. if err != nil {
  187. log.Println("更新检索出错:", err.Error())
  188. } else {
  189. b = true
  190. }
  191. }
  192. return b
  193. }
  194. func (e *Elastic) GetNoLimit(index, query string) *[]map[string]interface{} {
  195. client := e.GetEsConn()
  196. defer e.DestoryEsConn(client)
  197. var res []map[string]interface{}
  198. if client != nil {
  199. defer func() {
  200. if r := recover(); r != nil {
  201. log.Println("[E]", r)
  202. for skip := 1; ; skip++ {
  203. _, file, line, ok := runtime.Caller(skip)
  204. if !ok {
  205. break
  206. }
  207. go log.Printf("%v,%v\n", file, line)
  208. }
  209. }
  210. }()
  211. searchResult, err := client.Search().Index(index).Source(query).Do(context.Background())
  212. if err != nil {
  213. log.Println("从ES查询出错", err.Error())
  214. return nil
  215. }
  216. if searchResult.Hits != nil {
  217. resNum := len(searchResult.Hits.Hits)
  218. res = make([]map[string]interface{}, resNum)
  219. for i, hit := range searchResult.Hits.Hits {
  220. json.Unmarshal(hit.Source, &res[i])
  221. }
  222. }
  223. }
  224. return &res
  225. }
  226. //func (e *Elastic) GetByIdField(index, itype, id, fields string) *map[string]interface{} {
  227. // client := e.GetEsConn()
  228. // defer e.DestoryEsConn(client)
  229. // if client != nil {
  230. // defer func() {
  231. // if r := recover(); r != nil {
  232. // log.Println("[E]", r)
  233. // for skip := 1; ; skip++ {
  234. // _, file, line, ok := runtime.Caller(skip)
  235. // if !ok {
  236. // break
  237. // }
  238. // go log.Printf("%v,%v\n", file, line)
  239. // }
  240. // }
  241. // }()
  242. // query := `{"query":{"term":{"_id":"` + id + `"}}`
  243. // if len(fields) > 0 {
  244. // query = query + `,"_source":[` + fields + `]`
  245. // }
  246. // query = query + "}"
  247. // searchResult, err := client.Search().Index(index).Type(itype).Source(query).Do()
  248. // if err != nil {
  249. // log.Println("从ES查询出错", err.Error())
  250. // return nil
  251. // }
  252. // var res map[string]interface{}
  253. // if searchResult.Hits != nil {
  254. // resNum := len(searchResult.Hits.Hits)
  255. // if resNum == 1 {
  256. // res = make(map[string]interface{})
  257. // for _, hit := range searchResult.Hits.Hits {
  258. // json.Unmarshal(*hit.Source., &res)
  259. // }
  260. // return &res
  261. // }
  262. // }
  263. // }
  264. // return nil
  265. //}
  266. func (e *Elastic) Count(index string, query interface{}) int64 {
  267. client := e.GetEsConn()
  268. defer e.DestoryEsConn(client)
  269. if client != nil {
  270. defer func() {
  271. if r := recover(); r != nil {
  272. log.Println("[E]", r)
  273. for skip := 1; ; skip++ {
  274. _, file, line, ok := runtime.Caller(skip)
  275. if !ok {
  276. break
  277. }
  278. go log.Printf("%v,%v\n", file, line)
  279. }
  280. }
  281. }()
  282. var qq es.Query
  283. if qi, ok2 := query.(es.Query); ok2 {
  284. qq = qi
  285. }
  286. n, err := client.Count(index).Query(qq).Do(context.Background())
  287. if err != nil {
  288. log.Println("统计出错", err.Error())
  289. }
  290. return n
  291. }
  292. return 0
  293. }
  294. //更新一个字段
  295. //func (e *Elastic) BulkUpdateArr(index, itype string, update []map[string]string) {
  296. // client := e.GetEsConn()
  297. // defer e.DestoryEsConn(client)
  298. // if client != nil {
  299. // defer func() {
  300. // if r := recover(); r != nil {
  301. // log.Println("[E]", r)
  302. // for skip := 1; ; skip++ {
  303. // _, file, line, ok := runtime.Caller(skip)
  304. // if !ok {
  305. // break
  306. // }
  307. // go log.Printf("%v,%v\n", file, line)
  308. // }
  309. // }
  310. // }()
  311. // for _, data := range update {
  312. // id := data["id"]
  313. // updateStr := data["updateStr"]
  314. // if id != "" && updateStr != "" {
  315. // _, err := client.Update().Index(index).Type(itype).Id(id).Script(updateStr).ScriptLang("groovy").Do()
  316. // if err != nil {
  317. // log.Println("更新检索出错:", err.Error())
  318. // }
  319. // } else {
  320. // log.Println("数据错误")
  321. // }
  322. // }
  323. // }
  324. //}
  325. //更新多个字段
  326. //func (e *Elastic) BulkUpdateMultipleFields(index, itype string, arrs [][]map[string]interface{}) {
  327. // client := e.GetEsConn()
  328. // defer e.DestoryEsConn(client)
  329. // if client != nil {
  330. // defer func() {
  331. // if r := recover(); r != nil {
  332. // log.Println("[E]", r)
  333. // for skip := 1; ; skip++ {
  334. // _, file, line, ok := runtime.Caller(skip)
  335. // if !ok {
  336. // break
  337. // }
  338. // go log.Printf("%v,%v\n", file, line)
  339. // }
  340. // }
  341. // }()
  342. // for _, arr := range arrs {
  343. // id := arr[0]["id"].(string)
  344. // update := arr[1]["update"].([]string)
  345. // for _, str := range update {
  346. // _, err := client.Update().Index(index).Type(itype).Id(id).Script(str).ScriptLang("groovy").Do()
  347. // if err != nil {
  348. // log.Println("更新检索出错:", err.Error())
  349. // }
  350. // }
  351. // }
  352. // }
  353. //}
  354. // UpdateBulk 批量修改文档
  355. func (e *Elastic) UpdateBulk(index string, docs ...[]map[string]interface{}) {
  356. client := e.GetEsConn()
  357. defer e.DestoryEsConn(client)
  358. bulkService := client.Bulk().Index(index).Refresh("true")
  359. //bulkService.Type(itype)
  360. for _, d := range docs {
  361. id := d[0]["_id"].(string)
  362. doc := es.NewBulkUpdateRequest().Id(id).Doc(d[1])
  363. bulkService.Add(doc)
  364. }
  365. _, err := bulkService.Do(context.Background())
  366. if err != nil {
  367. fmt.Printf("UpdateBulk all success err is %v\n", err)
  368. }
  369. //if len(res.Failed()) > 0 {
  370. // fmt.Printf("UpdateBulk all success failed is %v\n", (res.Items[0]))
  371. //}
  372. }
  373. // UpsertBulk 批量修改文档(不存在则插入)
  374. func (e *Elastic) UpsertBulk(ctx context.Context, index string, ids []string, docs []interface{}) error {
  375. client := e.GetEsConn()
  376. defer e.DestoryEsConn(client)
  377. bulkService := client.Bulk().Index(index).Refresh("true")
  378. //bulkService.Type("bidding")
  379. for i := range ids {
  380. doc := es.NewBulkUpdateRequest().Id(ids[i]).Doc(docs[i]).Upsert(docs[i])
  381. bulkService.Add(doc)
  382. }
  383. res, err := bulkService.Do(context.Background())
  384. if err != nil {
  385. return err
  386. }
  387. if len(res.Failed()) > 0 {
  388. return errors.New(res.Failed()[0].Error.Reason)
  389. }
  390. return nil
  391. }
  392. // 批量删除
  393. func (e *Elastic) DeleteBulk(index string, ids []string) {
  394. client := e.GetEsConn()
  395. defer e.DestoryEsConn(client)
  396. bulkService := client.Bulk().Index(index).Refresh("true")
  397. //bulkService.Type("bidding")
  398. for i := range ids {
  399. req := es.NewBulkDeleteRequest().Id(ids[i])
  400. bulkService.Add(req)
  401. }
  402. res, err := bulkService.Do(context.Background())
  403. if err != nil {
  404. fmt.Printf("DeleteBulk success is %v\n", len(res.Succeeded()))
  405. }
  406. }
  407. // InsertOrUpdate 插入或更新
  408. func (e *Elastic) InsertOrUpdate(index string, docs []map[string]interface{}) error {
  409. client := e.GetEsConn()
  410. defer e.DestoryEsConn(client)
  411. for _, item := range docs {
  412. // 获取唯一标识符
  413. id := item["id"].(string)
  414. if id == "" {
  415. id = item["_id"].(string)
  416. }
  417. // 根据id判断记录是否存在
  418. exists, err := client.Exists().
  419. Index(index).
  420. Id(id).
  421. Do(context.Background())
  422. if err != nil {
  423. return err
  424. }
  425. // 存在则更新,不存在则插入
  426. if exists {
  427. _, err := client.Update().
  428. Index(index).
  429. Id(id).
  430. Doc(item).
  431. Do(context.Background())
  432. if err != nil {
  433. return err
  434. }
  435. } else {
  436. _, err := client.Index().
  437. Index(index).
  438. Id(id).
  439. BodyJson(item).
  440. Do(context.Background())
  441. if err != nil {
  442. return err
  443. }
  444. }
  445. }
  446. return nil
  447. }
  448. // ExistsIndex 判断索引是否存在
  449. func (e *Elastic) ExistsIndex(index string) (exists bool, err error) {
  450. client := e.GetEsConn()
  451. defer e.DestoryEsConn(client)
  452. exists, err = client.IndexExists(index).Do(context.Background())
  453. return
  454. }
  455. // CreateIndex 创建索引
  456. func (e *Elastic) CreateIndex(index string, mapping string) (err error) {
  457. client := e.GetEsConn()
  458. defer e.DestoryEsConn(client)
  459. _, err = client.CreateIndex(index).BodyString(mapping).Do(context.Background())
  460. return
  461. }
  462. // DeleteIndex 删除索引
  463. func (e *Elastic) DeleteIndex(index string) (err error) {
  464. client := e.GetEsConn()
  465. defer e.DestoryEsConn(client)
  466. _, err = client.DeleteIndex(index).Do(context.Background())
  467. return
  468. }
  469. // RemoveAlias 移除别名
  470. func (e *Elastic) RemoveAlias(index, aliasName string) (err error) {
  471. client := e.GetEsConn()
  472. defer e.DestoryEsConn(client)
  473. _, err = client.Alias().Remove(index, aliasName).Do(context.Background())
  474. return
  475. }
  476. // SetAlias 添加别名
  477. func (e *Elastic) SetAlias(index, aliasName string) (err error) {
  478. client := e.GetEsConn()
  479. defer e.DestoryEsConn(client)
  480. _, err = client.Alias().Add(index, aliasName).Do(context.Background())
  481. return
  482. }
  483. // DeleteByID 根据ID 删除索引数据;id 或者索引名称不存在,都不会报错
  484. func (e *Elastic) DeleteByID(index, id string) error {
  485. client := e.GetEsConn()
  486. defer e.DestoryEsConn(client)
  487. _, err := client.Delete().Index(index).Id(id).Do(context.Background())
  488. if err != nil && es.IsNotFound(err) {
  489. return nil
  490. }
  491. return err
  492. }
  493. // UpdateDocument 更新指定ID的文档
  494. func (e *Elastic) UpdateDocument(indexName string, documentID string, updateData map[string]interface{}) error {
  495. client := e.GetEsConn()
  496. defer e.DestoryEsConn(client)
  497. updateResult, err := client.Update().
  498. Index(indexName).
  499. Id(documentID).
  500. Doc(updateData).
  501. Do(context.Background())
  502. if err != nil {
  503. return err
  504. }
  505. if updateResult.Result != "updated" {
  506. return fmt.Errorf("Document not updated: %v", updateResult.Result)
  507. }
  508. return nil
  509. }
  510. // Save 保存对象
  511. func (e *Elastic) Save(index string, obj interface{}) bool {
  512. client := e.GetEsConn()
  513. defer e.DestoryEsConn(client)
  514. defer func() {
  515. if r := recover(); r != nil {
  516. log.Println("[E]", r)
  517. for skip := 1; ; skip++ {
  518. _, file, line, ok := runtime.Caller(skip)
  519. if !ok {
  520. break
  521. }
  522. go log.Printf("%v,%v\n", file, line)
  523. }
  524. }
  525. }()
  526. data := util.ObjToMap(obj)
  527. _id := mongodb.BsonIdToSId((*data)["_id"])
  528. (*data)["id"] = _id
  529. delete((*data), "_id")
  530. _, err := client.Index().Index(index).Id(_id).BodyJson(data).Do(context.TODO())
  531. if err != nil {
  532. log.Println("保存到ES出错", err.Error(), obj)
  533. return false
  534. } else {
  535. return true
  536. }
  537. }