123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package broker
- import (
- . "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
- "bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/loadmodule"
- "fmt"
- "github.com/gogf/gf/v2/net/ghttp"
- "github.com/gogf/gf/v2/net/gtrace"
- "github.com/gogf/gf/v2/os/gcfg"
- "github.com/gogf/gf/v2/os/gctx"
- "log"
- "net/http"
- "net/url"
- "sync"
- )
- type broker struct {
- sync.RWMutex
- addresses map[string]loadmodule.ProxyLoadModule
- outServer map[string]OutServerInterface
- }
- func InitBroker() *broker {
- b := new(broker)
- b.addresses = make(map[string]loadmodule.ProxyLoadModule)
- b.outServer = make(map[string]OutServerInterface)
- return b
- }
- type OutServerInterface interface {
- AutoLogin() error //自动登录
- RequestLogin(r *ghttp.Request) error //身份状态
- CheckLoginOut(r *ghttp.Request) bool //状态是否过期
- Filter(r *ghttp.Request) error //过滤器
- }
- // AddNode 发现注册新节点
- func (r *broker) AddNode(code, address string) error {
- r.Lock()
- defer r.Unlock()
- module, ok := r.addresses[code]
- if !ok {
- module = loadmodule.LoadProxyLoadFactory(gcfg.Instance().MustGet(gctx.New(), "system.loadProxyModule", 0).Int())
- r.addresses[code] = module
- }
- log.Printf("节点%s服务注册%s", code, address)
- return module.Add(address)
- }
- // DelNode 节点下线watcher删除节点
- func (r *broker) DelNode(code, address string) error {
- //进行删除操作
- r.Lock()
- defer r.Unlock()
- module, ok := r.addresses[code]
- if !ok {
- log.Printf("节点%s服务关闭异常%s", code, address)
- return NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("节点下线异常 module %s", code))
- }
- log.Printf("节点%s服务关闭%s", code, address)
- return module.Del(address)
- }
- // GetServerAddr 获取代理节点
- func (r *broker) GetServerAddr(code, ip string) (*url.URL, error) {
- r.RLock()
- defer r.RUnlock()
- module, ok := r.addresses[code]
- if !ok {
- return nil, NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("获取服务地址异常 module %s", code))
- }
- return module.Get(ip)
- }
- // RegisterOutServer 注册外部服务
- func (b *broker) RegisterOutServer(u *url.URL, outServer OutServerInterface) {
- if u != nil && outServer != nil {
- b.outServer[u.String()] = outServer
- }
- }
- // UnLoginSetErr 挂载外部服务,当未登录时,通过处罚异常尝试重新加载
- func UnLoginSetErr(resp *http.Response) error {
- if resp.StatusCode == 401 {
- return fmt.Errorf("未登录异常")
- }
- return nil
- }
- // GetOutSeverAutoLogin 获取代理节点
- func (b *broker) GetOutSeverAutoLogin(address string, r *ghttp.Request) (serverUrl *url.URL, err error) {
- _, span := gtrace.NewSpan(r.Context(), "GetOutSeverAutoLogin")
- defer span.End()
- serverUrl, err = url.Parse(address)
- if err != nil {
- return nil, err
- }
- server, ok := b.outServer[address]
- if !ok {
- return nil, fmt.Errorf("未知外部服务")
- }
- //装配登录身份
- err = server.RequestLogin(r)
- return
- }
- // CheckOutSeverLoginOut 获取代理节点
- func (b *broker) CheckOutSeverLoginOut(address string, r *ghttp.Request) bool {
- server, ok := b.outServer[address]
- if !ok {
- return true
- }
- return server.CheckLoginOut(r)
- }
- // OutSeverLoginIn 获取代理节点
- func (b *broker) OutSeverLoginIn(address string) error {
- server, ok := b.outServer[address]
- if !ok {
- return fmt.Errorf("未知外部服务")
- }
- return server.AutoLogin()
- }
- // OutSeverFilter 外部程序过滤器
- func (b *broker) OutSeverFilter(address string, r *ghttp.Request) error {
- server, ok := b.outServer[address]
- if !ok {
- return fmt.Errorf("未知外部服务")
- }
- return server.Filter(r)
- }
|