es.go 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package des
  2. import (
  3. "createindex/base"
  4. "log"
  5. "time"
  6. "app.yhyue.com/moapp/jybase/es"
  7. // esv7 "github.com/olivere/elastic/v7"
  8. )
  9. //目标保存对象,es工具类
  10. type Des struct {
  11. Ch chan map[string]any
  12. ChEnd chan bool
  13. Index string
  14. ES *es.EsV7
  15. }
  16. func GetDes(addr, user, pwd, index string, size int) *Des {
  17. e := &Des{
  18. Ch: make(chan map[string]any, 500),
  19. ChEnd: make(chan bool),
  20. Index: index,
  21. ES: es.NewEs("v7", addr, size, user, pwd).(*es.EsV7),
  22. }
  23. return e
  24. }
  25. func (es *Des) Save() {
  26. arr := []map[string]any{}
  27. L:
  28. for {
  29. select {
  30. case data := <-es.Ch:
  31. arr = append(arr, data)
  32. if len(arr) > 99 {
  33. func(ao *[]map[string]any) {
  34. base.BulkSave(es.ES, es.Index, "", ao, false)
  35. }(&arr)
  36. arr = []map[string]any{}
  37. }
  38. default:
  39. select {
  40. case <-es.ChEnd:
  41. break L
  42. case <-time.After(time.Millisecond * 10):
  43. }
  44. }
  45. }
  46. if len(arr) > 0 {
  47. func(ao *[]map[string]any) {
  48. base.BulkSave(es.ES, es.Index, "", ao, false)
  49. }(&arr)
  50. }
  51. log.Println("es,保存完成...")
  52. }
  53. func (es *Des) Send(data map[string]any) {
  54. es.Ch <- data
  55. }
  56. func (es *Des) End() {
  57. es.ChEnd <- true
  58. }