|
@@ -9,7 +9,6 @@ import (
|
|
"net/http/httputil"
|
|
"net/http/httputil"
|
|
"net/url"
|
|
"net/url"
|
|
"os"
|
|
"os"
|
|
- "sync"
|
|
|
|
"time"
|
|
"time"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -18,7 +17,6 @@ var (
|
|
)
|
|
)
|
|
|
|
|
|
type esProxyManager struct {
|
|
type esProxyManager struct {
|
|
- lock sync.Mutex
|
|
|
|
esStatus consts.EsStatus
|
|
esStatus consts.EsStatus
|
|
proxyPool chan *httputil.ReverseProxy
|
|
proxyPool chan *httputil.ReverseProxy
|
|
simpleQueryPool, aggsQueryPool, complexQueryPool chan struct{}
|
|
simpleQueryPool, aggsQueryPool, complexQueryPool chan struct{}
|
|
@@ -42,7 +40,6 @@ func initManager(ctx context.Context) *esProxyManager {
|
|
}
|
|
}
|
|
|
|
|
|
emp := &esProxyManager{
|
|
emp := &esProxyManager{
|
|
- lock: sync.Mutex{},
|
|
|
|
esStatus: consts.EsStatus_Free,
|
|
esStatus: consts.EsStatus_Free,
|
|
proxyPool: pool,
|
|
proxyPool: pool,
|
|
simpleQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int()),
|
|
simpleQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int()),
|
|
@@ -83,6 +80,7 @@ func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httpu
|
|
if queryLevel+int(epm.esStatus) > 2 {
|
|
if queryLevel+int(epm.esStatus) > 2 {
|
|
return nil, fmt.Errorf("server is busy")
|
|
return nil, fmt.Errorf("server is busy")
|
|
}
|
|
}
|
|
|
|
+ //epm.Status()
|
|
select {
|
|
select {
|
|
case <-epm.getPool(queryLevel):
|
|
case <-epm.getPool(queryLevel):
|
|
c := <-epm.proxyPool
|
|
c := <-epm.proxyPool
|
|
@@ -95,7 +93,10 @@ func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httpu
|
|
|
|
|
|
// Release 归还链接池
|
|
// Release 归还链接池
|
|
func (epm *esProxyManager) Release(ctx context.Context, queryLevel int) {
|
|
func (epm *esProxyManager) Release(ctx context.Context, queryLevel int) {
|
|
- epm.getPool(queryLevel) <- struct{}{}
|
|
|
|
|
|
+ go func() {
|
|
|
|
+ time.Sleep(time.Second * 10)
|
|
|
|
+ epm.getPool(queryLevel) <- struct{}{}
|
|
|
|
+ }()
|
|
}
|
|
}
|
|
|
|
|
|
// 获取对应查询复杂度的链接池
|
|
// 获取对应查询复杂度的链接池
|
|
@@ -109,3 +110,11 @@ func (epm *esProxyManager) getPool(queryLevel int) chan struct{} {
|
|
return epm.complexQueryPool
|
|
return epm.complexQueryPool
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+func (epm *esProxyManager) Status() {
|
|
|
|
+ g.Dump(map[string]interface{}{
|
|
|
|
+ "simple": len(epm.simpleQueryPool),
|
|
|
|
+ "aggs": len(epm.aggsQueryPool),
|
|
|
|
+ "complex": len(epm.complexQueryPool),
|
|
|
|
+ })
|
|
|
|
+}
|