123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- package service
- import (
- "context"
- "esproxy/internal/consts"
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/glog"
- "net/http/httputil"
- "net/url"
- "os"
- "sync"
- "sync/atomic"
- "time"
- )
- var (
- EsProxyManager *esProxyManager = initManager(context.Background())
- )
- type esProxyManager struct {
- esStatus consts.EsStatus
- proxyPool chan *httputil.ReverseProxy
- simpleQueryPool, aggsQueryPool, complexQueryPool chan struct{}
- redisPrefix string
- }
- // initManager 初始化代理Manager
- func initManager(ctx context.Context) *esProxyManager {
- consts.LogTime = g.Cfg().MustGet(ctx, "logTime").Float64()
- consts.LogEquity = g.Cfg().MustGet(ctx, "logEquity").Bool()
- esAddr := g.Cfg().MustGet(ctx, "elasticSearch.address").Strings()
- if len(esAddr) < 1 {
- glog.Error(ctx, "加载出错,退出")
- os.Exit(0)
- }
- pool := make(chan *httputil.ReverseProxy, len(esAddr))
- for _, v := range esAddr {
- u, err := url.Parse(v)
- if err != nil {
- glog.Error(ctx, "init parse esAddr err:", err)
- } else {
- pool <- CreateDefaultProxyClient(u)
- }
- }
- emp := &esProxyManager{
- esStatus: consts.EsStatus_Free,
- proxyPool: pool,
- simpleQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int()),
- aggsQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.aggs", 10).Int()),
- complexQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.complex", 10).Int()),
- redisPrefix: g.Cfg().MustGet(ctx, "server.redisPrefix", "es").String(),
- }
- go emp.UpdateEsStatus(ctx)
- return emp
- }
- func makeEmptyChan(total int) chan struct{} {
- c := make(chan struct{}, total)
- for i := 0; i < total; i++ {
- c <- struct{}{}
- }
- return c
- }
- // UpdateEsStatus 更新es状态
- func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
- if cron := g.Cfg().MustGet(ctx, "elasticSearch.queryState.cron").String(); cron != "" {
- err := EsStatusQuery.GetEsStatusRunning(cron, func(ctx context.Context) {
- finalStatus := EsStatusQuery.GetEsStatus(ctx)
- epm.esStatus = finalStatus
- _, _ = g.Redis().Set(ctx, fmt.Sprintf("%s_status", epm.redisPrefix), finalStatus)
- g.Log().Debugf(ctx, "当前 epm.esStatus:%v", epm.esStatus)
- })
- if err != nil {
- panic(err)
- }
- }
- }
- // GetProxy 获取代理对象
- func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httputil.ReverseProxy, error) {
- if queryLevel+int(epm.esStatus) > 2 {
- go RejectedIncrement(ctx, epm.redisPrefix) //程序拒绝数
- return nil, fmt.Errorf("server is busy")
- }
- g.Log().Debugf(ctx, "esProxyManager pool %+v", epm.Status())
- Increment() //增加等待数
- defer Decrement()
- select {
- case <-epm.getPool(queryLevel):
- c := <-epm.proxyPool
- epm.proxyPool <- c
- return c, nil
- case <-time.After(time.Second * time.Duration(g.Cfg().MustGet(ctx, "elasticSearch.queryState.waitTime", 5).Int())):
- go OvertimeIncrement(ctx, epm.redisPrefix) //超时退出数
- return nil, fmt.Errorf("wait time out")
- }
- }
- // Release 归还链接池
- func (epm *esProxyManager) Release(ctx context.Context, queryLevel int) {
- go func() {
- epm.getPool(queryLevel) <- struct{}{}
- }()
- }
- // 获取对应查询复杂度的链接池
- func (epm *esProxyManager) getPool(queryLevel int) chan struct{} {
- switch queryLevel {
- case int(consts.QueryLevelSimple):
- return epm.simpleQueryPool
- case int(consts.QueryLevelAggs):
- return epm.aggsQueryPool
- default:
- return epm.complexQueryPool
- }
- }
- func (epm *esProxyManager) Status() map[string]interface{} {
- return map[string]interface{}{
- "simple": len(epm.simpleQueryPool),
- "aggs": len(epm.aggsQueryPool),
- "complex": len(epm.complexQueryPool),
- }
- }
- // QueueCounter 程序等待数 RejectedCounter程序繁忙拒绝数 OvertimeCounter超时拒绝数
- var (
- QueueCounter int64
- sy, sn sync.Mutex
- )
- func InformationNumber(ctx context.Context) map[string]interface{} {
- rejectedCount, _ := g.Redis().Get(ctx, "es_rejected_count")
- timeoutCount, _ := g.Redis().Get(ctx, "es_timeout_count")
- return map[string]interface{}{
- "simple": g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int() - len(EsProxyManager.simpleQueryPool), //简单执行数
- "aggs": g.Cfg().MustGet(ctx, "elasticSearch.pool.aggs", 10).Int() - len(EsProxyManager.aggsQueryPool), //聚合执行数
- "complex": g.Cfg().MustGet(ctx, "elasticSearch.pool.complex", 10).Int() - len(EsProxyManager.complexQueryPool), //复杂执行数
- "queueCount": QueueCounter, //等待数
- "rejectedCount": rejectedCount.Val(), //拒绝数
- "timeOutCount": timeoutCount.Val(), //超时数
- }
- }
- // RejectedIncrement queryLevel+int(epm.esStatus) > 2拒绝数
- func RejectedIncrement(ctx context.Context, redisPrefix string) {
- sy.Lock()
- defer sy.Unlock()
- _, _ = g.Redis().Incr(ctx, fmt.Sprintf("%s_rejected_count", redisPrefix))
- }
- // OvertimeIncrement 超时断开数
- func OvertimeIncrement(ctx context.Context, redisPrefix string) {
- sn.Lock()
- defer sn.Unlock()
- _, _ = g.Redis().Incr(ctx, fmt.Sprintf("%s_timeout_count", redisPrefix))
- }
- // Increment 程序等待数
- func Increment() {
- atomic.AddInt64(&QueueCounter, 1)
- }
- // Decrement 获取到连接池 减少程序等待数
- func Decrement() {
- atomic.AddInt64(&QueueCounter, -1)
- }
|