1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- package service
- import (
- "context"
- "esproxy/internal/consts"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/glog"
- "net/http/httputil"
- "net/url"
- "os"
- "sync"
- )
- var (
- EsProxyManager *esProxyManager = initManager(context.Background())
- )
- type esProxyManager struct {
- lock sync.Mutex
- esStatus consts.EsStatus
- proxyPool chan *httputil.ReverseProxy
- }
- // initManager 初始化代理Manager
- func initManager(ctx context.Context) *esProxyManager {
- esAddr := g.Cfg().MustGet(ctx, "elasticSearch.address").Strings()
- if len(esAddr) < 1 {
- glog.Error(ctx, "加载出错,退出")
- os.Exit(0)
- }
- pool := make(chan *httputil.ReverseProxy, len(esAddr))
- for _, v := range esAddr {
- u, err := url.Parse(v)
- if err != nil {
- glog.Error(ctx, "init parse esAddr err:", err)
- } else {
- pool <- CreateDefaultProxyClient(u)
- }
- }
- emp := &esProxyManager{
- lock: sync.Mutex{},
- esStatus: consts.EsStatus_Free,
- proxyPool: pool,
- }
- go emp.UpdateEsStatus(ctx)
- return emp
- }
- // UpdateEsStatus 更新es状态
- func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
- if cron := g.Cfg().MustGet(ctx, "elasticSearch.queryState.cron").String(); cron != "" {
- err := EsStatusQuery.GetEsStatusRunning(cron, func(ctx context.Context) {
- finalStatus := EsStatusQuery.GetEsStatus(ctx)
- epm.esStatus = finalStatus
- _, _ = g.Redis().Set(ctx, "es_status", finalStatus)
- g.Log().Infof(ctx, "当前 epm.esStatus:%v", epm.esStatus)
- })
- if err != nil {
- panic(err)
- }
- }
- }
- // GetProxy 获取代理对象
- func (epm *esProxyManager) GetProxy(ctx context.Context) (*httputil.ReverseProxy, error) {
- //epm.lock.Lock()
- //defer epm.lock.Unlock()
- c := <-epm.proxyPool
- epm.proxyPool <- c
- return c, nil
- }
|