123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- package service
- import (
- "context"
- "esproxy/internal/consts"
- "fmt"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/glog"
- "net/http/httputil"
- "net/url"
- "os"
- "time"
- )
- var (
- EsProxyManager *esProxyManager = initManager(context.Background())
- )
- type esProxyManager struct {
- esStatus consts.EsStatus
- proxyPool chan *httputil.ReverseProxy
- simpleQueryPool, aggsQueryPool, complexQueryPool chan struct{}
- }
- // initManager 初始化代理Manager
- func initManager(ctx context.Context) *esProxyManager {
- esAddr := g.Cfg().MustGet(ctx, "elasticSearch.address").Strings()
- if len(esAddr) < 1 {
- glog.Error(ctx, "加载出错,退出")
- os.Exit(0)
- }
- pool := make(chan *httputil.ReverseProxy, len(esAddr))
- for _, v := range esAddr {
- u, err := url.Parse(v)
- if err != nil {
- glog.Error(ctx, "init parse esAddr err:", err)
- } else {
- pool <- CreateDefaultProxyClient(u)
- }
- }
- emp := &esProxyManager{
- esStatus: consts.EsStatus_Free,
- proxyPool: pool,
- simpleQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.simple", 10).Int()),
- aggsQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.aggs", 10).Int()),
- complexQueryPool: makeEmptyChan(g.Cfg().MustGet(ctx, "elasticSearch.pool.complex", 10).Int()),
- }
- go emp.UpdateEsStatus(ctx)
- return emp
- }
- func makeEmptyChan(total int) chan struct{} {
- c := make(chan struct{}, total)
- for i := 0; i < total; i++ {
- c <- struct{}{}
- }
- return c
- }
- // UpdateEsStatus 更新es状态
- func (epm *esProxyManager) UpdateEsStatus(ctx context.Context) {
- if cron := g.Cfg().MustGet(ctx, "elasticSearch.queryState.cron").String(); cron != "" {
- err := EsStatusQuery.GetEsStatusRunning(cron, func(ctx context.Context) {
- finalStatus := EsStatusQuery.GetEsStatus(ctx)
- epm.esStatus = finalStatus
- _, _ = g.Redis().Set(ctx, "es_status", finalStatus)
- g.Log().Infof(ctx, "当前 epm.esStatus:%v", epm.esStatus)
- })
- if err != nil {
- panic(err)
- }
- }
- }
- // GetProxy 获取代理对象
- func (epm *esProxyManager) GetProxy(ctx context.Context, queryLevel int) (*httputil.ReverseProxy, error) {
- if queryLevel+int(epm.esStatus) > 2 {
- return nil, fmt.Errorf("server is busy")
- }
- //epm.Status()
- select {
- case <-epm.getPool(queryLevel):
- c := <-epm.proxyPool
- epm.proxyPool <- c
- return c, nil
- case <-time.After(time.Second * time.Duration(g.Cfg().MustGet(ctx, "elasticSearch.queryState.waitTime", 5).Int())):
- return nil, fmt.Errorf("wait time out")
- }
- }
- // Release 归还链接池
- func (epm *esProxyManager) Release(ctx context.Context, queryLevel int) {
- go func() {
- time.Sleep(time.Second * 10)
- epm.getPool(queryLevel) <- struct{}{}
- }()
- }
- // 获取对应查询复杂度的链接池
- func (epm *esProxyManager) getPool(queryLevel int) chan struct{} {
- switch queryLevel {
- case int(consts.QueryLevelSimple):
- return epm.simpleQueryPool
- case int(consts.QueryLevelAggs):
- return epm.aggsQueryPool
- default:
- return epm.complexQueryPool
- }
- }
- func (epm *esProxyManager) Status() {
- g.Dump(map[string]interface{}{
- "simple": len(epm.simpleQueryPool),
- "aggs": len(epm.aggsQueryPool),
- "complex": len(epm.complexQueryPool),
- })
- }
|