|
@@ -9,6 +9,8 @@ import (
|
|
"net/http/httputil"
|
|
"net/http/httputil"
|
|
"net/url"
|
|
"net/url"
|
|
"os"
|
|
"os"
|
|
|
|
+ "sync"
|
|
|
|
+ "sync/atomic"
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -51,7 +53,6 @@ func initManager(ctx context.Context) *esProxyManager {
|
|
}
|
|
}
|
|
|
|
|
|
go emp.UpdateEsStatus(ctx)
|
|
go emp.UpdateEsStatus(ctx)
|
|
-
|
|
|
|
return emp
|
|
return emp
|
|
}
|
|
}
|
|
|
|
|
|
@@ -81,15 +82,19 @@ func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
|
|
// GetProxy 获取代理对象
|
|
// GetProxy 获取代理对象
|
|
func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httputil.ReverseProxy, error) {
|
|
func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httputil.ReverseProxy, error) {
|
|
if queryLevel+int(epm.esStatus) > 2 {
|
|
if queryLevel+int(epm.esStatus) > 2 {
|
|
|
|
+ go RejectedIncrement(ctx) //程序拒绝数
|
|
return nil, fmt.Errorf("server is busy")
|
|
return nil, fmt.Errorf("server is busy")
|
|
}
|
|
}
|
|
g.Log().Debugf(ctx, "esProxyManager pool %+v", epm.Status())
|
|
g.Log().Debugf(ctx, "esProxyManager pool %+v", epm.Status())
|
|
|
|
+ Increment() //增加等待数
|
|
|
|
+ defer Decrement()
|
|
select {
|
|
select {
|
|
case <-epm.getPool(queryLevel):
|
|
case <-epm.getPool(queryLevel):
|
|
c := <-epm.proxyPool
|
|
c := <-epm.proxyPool
|
|
epm.proxyPool <- c
|
|
epm.proxyPool <- c
|
|
return c, nil
|
|
return c, nil
|
|
case <-time.After(time.Second * time.Duration(g.Cfg().MustGet(ctx, "elasticSearch.queryState.waitTime", 5).Int())):
|
|
case <-time.After(time.Second * time.Duration(g.Cfg().MustGet(ctx, "elasticSearch.queryState.waitTime", 5).Int())):
|
|
|
|
+ go OvertimeIncrement(ctx) //超时退出数
|
|
return nil, fmt.Errorf("wait time out")
|
|
return nil, fmt.Errorf("wait time out")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -120,3 +125,47 @@ func (epm *esProxyManager) Status() map[string]interface{} {
|
|
"complex": len(epm.complexQueryPool),
|
|
"complex": len(epm.complexQueryPool),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+// QueueCounter 程序等待数 RejectedCounter程序繁忙拒绝数 OvertimeCounter超时拒绝数
|
|
|
|
+var (
|
|
|
|
+ QueueCounter int64
|
|
|
|
+ sy, sn sync.Mutex
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+func InformationNumber(ctx context.Context) map[string]interface{} {
|
|
|
|
+ rejected_count, _ := g.Redis().Get(ctx, "es_rejected_count")
|
|
|
|
+ timeout_count, _ := g.Redis().Get(ctx, "es_timeout_count")
|
|
|
|
+ return map[string]interface{}{
|
|
|
|
+ "simple": g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int() - len(EsProxyManager.simpleQueryPool), //简单执行数
|
|
|
|
+ "aggs": g.Cfg().MustGet(ctx, "elasticSearch.pool.aggs", 10).Int() - len(EsProxyManager.aggsQueryPool), //聚合执行数
|
|
|
|
+ "complex": g.Cfg().MustGet(ctx, "elasticSearch.pool.complex", 10).Int() - len(EsProxyManager.complexQueryPool), //复杂执行数
|
|
|
|
+ "queueCount": QueueCounter, //等待数
|
|
|
|
+ "rejectedCount": rejected_count.Val(), //拒绝数
|
|
|
|
+ "timeOutCount": timeout_count.Val(), //超时数
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// RejectedIncrement queryLevel+int(epm.esStatus) > 2拒绝数
|
|
|
|
+func RejectedIncrement(ctx context.Context) {
|
|
|
|
+ sy.Lock()
|
|
|
|
+ defer sy.Unlock()
|
|
|
|
+ _, _ = g.Redis().Incr(ctx, "es_rejected_count")
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// OvertimeIncrement 超时断开数
|
|
|
|
+func OvertimeIncrement(ctx context.Context) {
|
|
|
|
+ sn.Lock()
|
|
|
|
+ defer sn.Unlock()
|
|
|
|
+ fmt.Println("超时")
|
|
|
|
+ _, _ = g.Redis().Incr(ctx, "es_overtime_increment")
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Increment 程序等待数
|
|
|
|
+func Increment() {
|
|
|
|
+ atomic.AddInt64(&QueueCounter, 1)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Decrement 获取到连接池 减少程序等待数
|
|
|
|
+func Decrement() {
|
|
|
|
+ atomic.AddInt64(&QueueCounter, -1)
|
|
|
|
+}
|