|
@@ -22,6 +22,7 @@ type esProxyManager struct {
|
|
esStatus consts.EsStatus
|
|
esStatus consts.EsStatus
|
|
proxyPool chan *httputil.ReverseProxy
|
|
proxyPool chan *httputil.ReverseProxy
|
|
simpleQueryPool, aggsQueryPool, complexQueryPool chan struct{}
|
|
simpleQueryPool, aggsQueryPool, complexQueryPool chan struct{}
|
|
|
|
+ redisPrefix string
|
|
}
|
|
}
|
|
|
|
|
|
// initManager 初始化代理Manager
|
|
// initManager 初始化代理Manager
|
|
@@ -50,6 +51,7 @@ func initManager(ctx context.Context) *esProxyManager {
|
|
simpleQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int()),
|
|
simpleQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int()),
|
|
aggsQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.aggs", 10).Int()),
|
|
aggsQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.aggs", 10).Int()),
|
|
complexQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.complex", 10).Int()),
|
|
complexQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.complex", 10).Int()),
|
|
|
|
+ redisPrefix: g.Cfg().MustGet(ctx, "server.redisPrefix", "es").String(),
|
|
}
|
|
}
|
|
|
|
|
|
go emp.UpdateEsStatus(ctx)
|
|
go emp.UpdateEsStatus(ctx)
|
|
@@ -70,7 +72,7 @@ func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
|
|
err := EsStatusQuery.GetEsStatusRunning(cron, func(ctx context.Context) {
|
|
err := EsStatusQuery.GetEsStatusRunning(cron, func(ctx context.Context) {
|
|
finalStatus := EsStatusQuery.GetEsStatus(ctx)
|
|
finalStatus := EsStatusQuery.GetEsStatus(ctx)
|
|
epm.esStatus = finalStatus
|
|
epm.esStatus = finalStatus
|
|
- _, _ = g.Redis().Set(ctx, "es_status", finalStatus)
|
|
|
|
|
|
+ _, _ = g.Redis().Set(ctx, fmt.Sprintf("%s_status", epm.redisPrefix), finalStatus)
|
|
g.Log().Debugf(ctx, "当前 epm.esStatus:%v", epm.esStatus)
|
|
g.Log().Debugf(ctx, "当前 epm.esStatus:%v", epm.esStatus)
|
|
})
|
|
})
|
|
if err != nil {
|
|
if err != nil {
|
|
@@ -82,7 +84,7 @@ func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
|
|
// GetProxy 获取代理对象
|
|
// GetProxy 获取代理对象
|
|
func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httputil.ReverseProxy, error) {
|
|
func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httputil.ReverseProxy, error) {
|
|
if queryLevel+int(epm.esStatus) > 2 {
|
|
if queryLevel+int(epm.esStatus) > 2 {
|
|
- go RejectedIncrement(ctx) //程序拒绝数
|
|
|
|
|
|
+ go RejectedIncrement(ctx, epm.redisPrefix) //程序拒绝数
|
|
return nil, fmt.Errorf("server is busy")
|
|
return nil, fmt.Errorf("server is busy")
|
|
}
|
|
}
|
|
g.Log().Debugf(ctx, "esProxyManager pool %+v", epm.Status())
|
|
g.Log().Debugf(ctx, "esProxyManager pool %+v", epm.Status())
|
|
@@ -94,7 +96,7 @@ func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httpu
|
|
epm.proxyPool <- c
|
|
epm.proxyPool <- c
|
|
return c, nil
|
|
return c, nil
|
|
case <-time.After(time.Second * time.Duration(g.Cfg().MustGet(ctx, "elasticSearch.queryState.waitTime", 5).Int())):
|
|
case <-time.After(time.Second * time.Duration(g.Cfg().MustGet(ctx, "elasticSearch.queryState.waitTime", 5).Int())):
|
|
- go OvertimeIncrement(ctx) //超时退出数
|
|
|
|
|
|
+ go OvertimeIncrement(ctx, epm.redisPrefix) //超时退出数
|
|
return nil, fmt.Errorf("wait time out")
|
|
return nil, fmt.Errorf("wait time out")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -146,17 +148,17 @@ func InformationNumber(ctx context.Context) map[string]interface{} {
|
|
}
|
|
}
|
|
|
|
|
|
// RejectedIncrement queryLevel+int(epm.esStatus) > 2拒绝数
|
|
// RejectedIncrement queryLevel+int(epm.esStatus) > 2拒绝数
|
|
-func RejectedIncrement(ctx context.Context) {
|
|
|
|
|
|
+func RejectedIncrement(ctx context.Context, redisPrefix string) {
|
|
sy.Lock()
|
|
sy.Lock()
|
|
defer sy.Unlock()
|
|
defer sy.Unlock()
|
|
- _, _ = g.Redis().Incr(ctx, "es_rejected_count")
|
|
|
|
|
|
+ _, _ = g.Redis().Incr(ctx, fmt.Sprintf("%s_rejected_count", redisPrefix))
|
|
}
|
|
}
|
|
|
|
|
|
// OvertimeIncrement 超时断开数
|
|
// OvertimeIncrement 超时断开数
|
|
-func OvertimeIncrement(ctx context.Context) {
|
|
|
|
|
|
+func OvertimeIncrement(ctx context.Context, redisPrefix string) {
|
|
sn.Lock()
|
|
sn.Lock()
|
|
defer sn.Unlock()
|
|
defer sn.Unlock()
|
|
- _, _ = g.Redis().Incr(ctx, "es_timeout_count")
|
|
|
|
|
|
+ _, _ = g.Redis().Incr(ctx, fmt.Sprintf("%s_timeout_count", redisPrefix))
|
|
}
|
|
}
|
|
|
|
|
|
// Increment 程序等待数
|
|
// Increment 程序等待数
|