package broker import ( . "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode" "bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/loadmodule" "fmt" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/net/gtrace" "github.com/gogf/gf/v2/os/gcfg" "github.com/gogf/gf/v2/os/gctx" "log" "net/http" "net/url" "sync" ) type broker struct { sync.RWMutex addresses map[string]loadmodule.ProxyLoadModule outServer map[string]OutServerInterface } func InitBroker() *broker { b := new(broker) b.addresses = make(map[string]loadmodule.ProxyLoadModule) b.outServer = make(map[string]OutServerInterface) return b } type OutServerInterface interface { AutoLogin() error //自动登录 RequestLogin(r *ghttp.Request) error //身份状态 CheckLoginOut(r *ghttp.Request) bool //状态是否过期 Filter(r *ghttp.Request) error //过滤器 } // AddNode 发现注册新节点 func (r *broker) AddNode(code, address string) error { r.Lock() defer r.Unlock() module, ok := r.addresses[code] if !ok { module = loadmodule.LoadProxyLoadFactory(gcfg.Instance().MustGet(gctx.New(), "system.loadProxyModule", 0).Int()) r.addresses[code] = module } log.Printf("节点%s服务注册%s", code, address) return module.Add(address) } // DelNode 节点下线watcher删除节点 func (r *broker) DelNode(code, address string) error { //进行删除操作 r.Lock() defer r.Unlock() module, ok := r.addresses[code] if !ok { log.Printf("节点%s服务关闭异常%s", code, address) return NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("节点下线异常 module %s", code)) } log.Printf("节点%s服务关闭%s", code, address) return module.Del(address) } // GetServerAddr 获取代理节点 func (r *broker) GetServerAddr(code, ip string) (*url.URL, error) { r.RLock() defer r.RUnlock() module, ok := r.addresses[code] if !ok { return nil, NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("获取服务地址异常 module %s", code)) } return module.Get(ip) } // RegisterOutServer 注册外部服务 func (b *broker) RegisterOutServer(u *url.URL, outServer OutServerInterface) { if u != nil && outServer != nil { b.outServer[u.String()] = outServer } } // UnLoginSetErr 挂载外部服务,当未登录时,通过处罚异常尝试重新加载 func UnLoginSetErr(resp *http.Response) error { if resp.StatusCode == 401 { return fmt.Errorf("未登录异常") } return nil } // GetOutSeverAutoLogin 获取代理节点 func (b *broker) GetOutSeverAutoLogin(address string, r *ghttp.Request) (serverUrl *url.URL, err error) { _, span := gtrace.NewSpan(r.Context(), "GetOutSeverAutoLogin") defer span.End() serverUrl, err = url.Parse(address) if err != nil { return nil, err } server, ok := b.outServer[address] if !ok { return nil, fmt.Errorf("未知外部服务") } //装配登录身份 err = server.RequestLogin(r) return } // CheckOutSeverLoginOut 获取代理节点 func (b *broker) CheckOutSeverLoginOut(address string, r *ghttp.Request) bool { server, ok := b.outServer[address] if !ok { return true } return server.CheckLoginOut(r) } // OutSeverLoginIn 获取代理节点 func (b *broker) OutSeverLoginIn(address string) error { server, ok := b.outServer[address] if !ok { return fmt.Errorf("未知外部服务") } return server.AutoLogin() } // OutSeverFilter 外部程序过滤器 func (b *broker) OutSeverFilter(address string, r *ghttp.Request) error { server, ok := b.outServer[address] if !ok { return fmt.Errorf("未知外部服务") } return server.Filter(r) }