manager.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. package service
  2. import (
  3. "context"
  4. "esproxy/internal/consts"
  5. "github.com/gogf/gf/v2/frame/g"
  6. "github.com/gogf/gf/v2/os/glog"
  7. "net/http/httputil"
  8. "net/url"
  9. "os"
  10. "sync"
  11. )
  12. var (
  13. EsProxyManager *esProxyManager = initManager(context.Background())
  14. )
  15. type esProxyManager struct {
  16. lock sync.Mutex
  17. esStatus consts.EsStatus
  18. proxyPool chan *httputil.ReverseProxy
  19. }
  20. // initManager 初始化代理Manager
  21. func initManager(ctx context.Context) *esProxyManager {
  22. esAddr := g.Cfg().MustGet(ctx, "elasticSearch.address").Strings()
  23. if len(esAddr) < 1 {
  24. glog.Error(ctx, "加载出错,退出")
  25. os.Exit(0)
  26. }
  27. pool := make(chan *httputil.ReverseProxy, len(esAddr))
  28. for _, v := range esAddr {
  29. u, err := url.Parse(v)
  30. if err != nil {
  31. glog.Error(ctx, "init parse esAddr err:", err)
  32. } else {
  33. pool <- CreateDefaultProxyClient(u)
  34. }
  35. }
  36. emp := &esProxyManager{
  37. lock: sync.Mutex{},
  38. esStatus: consts.EsStatus_Free,
  39. proxyPool: pool,
  40. }
  41. go emp.UpdateEsStatus(ctx)
  42. return emp
  43. }
  44. // UpdateEsStatus 更新es状态
  45. func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
  46. if cron := g.Cfg().MustGet(ctx, "elasticSearch.queryState.cron").String(); cron != "" {
  47. err := EsStatusQuery.GetEsStatusRunning(cron, func(ctx context.Context) {
  48. finalStatus := EsStatusQuery.GetEsStatus(ctx)
  49. epm.esStatus = finalStatus
  50. _, _ = g.Redis().Set(ctx, "es_status", finalStatus)
  51. g.Log().Infof(ctx, "当前 epm.esStatus:%v", epm.esStatus)
  52. })
  53. if err != nil {
  54. panic(err)
  55. }
  56. }
  57. }
  58. // GetProxy 获取代理对象
  59. func (epm *esProxyManager) GetProxy(ctx context.Context) (*httputil.ReverseProxy, error) {
  60. //epm.lock.Lock()
  61. //defer epm.lock.Unlock()
  62. c := <-epm.proxyPool
  63. epm.proxyPool <- c
  64. return c, nil
  65. }