manager.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package service
  2. import (
  3. "context"
  4. "esproxy/internal/consts"
  5. "fmt"
  6. "github.com/gogf/gf/v2/frame/g"
  7. "github.com/gogf/gf/v2/os/glog"
  8. "net/http/httputil"
  9. "net/url"
  10. "os"
  11. "time"
  12. )
  13. var (
  14. EsProxyManager *esProxyManager = initManager(context.Background())
  15. )
  16. type esProxyManager struct {
  17. esStatus consts.EsStatus
  18. proxyPool chan *httputil.ReverseProxy
  19. simpleQueryPool, aggsQueryPool, complexQueryPool chan struct{}
  20. }
  21. // initManager 初始化代理Manager
  22. func initManager(ctx context.Context) *esProxyManager {
  23. esAddr := g.Cfg().MustGet(ctx, "elasticSearch.address").Strings()
  24. if len(esAddr) < 1 {
  25. glog.Error(ctx, "加载出错,退出")
  26. os.Exit(0)
  27. }
  28. pool := make(chan *httputil.ReverseProxy, len(esAddr))
  29. for _, v := range esAddr {
  30. u, err := url.Parse(v)
  31. if err != nil {
  32. glog.Error(ctx, "init parse esAddr err:", err)
  33. } else {
  34. pool <- CreateDefaultProxyClient(u)
  35. }
  36. }
  37. emp := &esProxyManager{
  38. esStatus: consts.EsStatus_Free,
  39. proxyPool: pool,
  40. simpleQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int()),
  41. aggsQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.aggs", 10).Int()),
  42. complexQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.complex", 10).Int()),
  43. }
  44. go emp.UpdateEsStatus(ctx)
  45. return emp
  46. }
  47. func makeEmptyChan(total int) chan struct{} {
  48. c := make(chan struct{}, total)
  49. for i := 0; i < total; i++ {
  50. c <- struct{}{}
  51. }
  52. return c
  53. }
  54. // UpdateEsStatus 更新es状态
  55. func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
  56. if cron := g.Cfg().MustGet(ctx, "elasticSearch.queryState.cron").String(); cron != "" {
  57. err := EsStatusQuery.GetEsStatusRunning(cron, func(ctx context.Context) {
  58. finalStatus := EsStatusQuery.GetEsStatus(ctx)
  59. epm.esStatus = finalStatus
  60. _, _ = g.Redis().Set(ctx, "es_status", finalStatus)
  61. g.Log().Debugf(ctx, "当前 epm.esStatus:%v", epm.esStatus)
  62. })
  63. if err != nil {
  64. panic(err)
  65. }
  66. }
  67. }
  68. // GetProxy 获取代理对象
  69. func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httputil.ReverseProxy, error) {
  70. if queryLevel+int(epm.esStatus) > 2 {
  71. return nil, fmt.Errorf("server is busy")
  72. }
  73. g.Log().Debugf(ctx, "esProxyManager pool %+v", epm.Status())
  74. select {
  75. case <-epm.getPool(queryLevel):
  76. c := <-epm.proxyPool
  77. epm.proxyPool <- c
  78. return c, nil
  79. case <-time.After(time.Second * time.Duration(g.Cfg().MustGet(ctx, "elasticSearch.queryState.waitTime", 5).Int())):
  80. return nil, fmt.Errorf("wait time out")
  81. }
  82. }
  83. // Release 归还链接池
  84. func (epm *esProxyManager) Release(ctx context.Context, queryLevel int) {
  85. go func() {
  86. epm.getPool(queryLevel) <- struct{}{}
  87. }()
  88. }
  89. // 获取对应查询复杂度的链接池
  90. func (epm *esProxyManager) getPool(queryLevel int) chan struct{} {
  91. switch queryLevel {
  92. case int(consts.QueryLevelSimple):
  93. return epm.simpleQueryPool
  94. case int(consts.QueryLevelAggs):
  95. return epm.aggsQueryPool
  96. default:
  97. return epm.complexQueryPool
  98. }
  99. }
  100. func (epm *esProxyManager) Status() map[string]interface{} {
  101. return map[string]interface{}{
  102. "simple": len(epm.simpleQueryPool),
  103. "aggs": len(epm.aggsQueryPool),
  104. "complex": len(epm.complexQueryPool),
  105. }
  106. }