package broker import ( . "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode" "bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/loadmodule" "fmt" "github.com/gogf/gf/v2/os/gcfg" "github.com/gogf/gf/v2/os/gctx" "log" "net/url" "sync" ) type broker struct { sync.RWMutex addresses map[string]loadmodule.ProxyLoadModule } func InitBroker() *broker { b := new(broker) b.addresses = make(map[string]loadmodule.ProxyLoadModule) return b } // AddNode 发现注册新节点 func (r *broker) AddNode(code, address string) error { r.Lock() defer r.Unlock() module, ok := r.addresses[code] if !ok { module = loadmodule.LoadProxyLoadFactory(gcfg.Instance().MustGet(gctx.New(), "system.loadProxyModule", 0).Int()) r.addresses[code] = module } log.Printf("节点%s服务注册%s", code, address) return module.Add(address) } // DelNode 节点下线watcher删除节点 func (r *broker) DelNode(code, address string) error { //进行删除操作 r.Lock() defer r.Unlock() module, ok := r.addresses[code] if !ok { log.Printf("节点%s服务关闭异常%s", code, address) return NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("节点下线异常 module %s", code)) } log.Printf("节点%s服务关闭%s", code, address) return module.Del(address) } // GetServerAddr 获取代理节点 func (r *broker) GetServerAddr(code, ip string) (*url.URL, error) { r.RLock() defer r.RUnlock() module, ok := r.addresses[code] if !ok { return nil, NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("获取服务地址异常 module %s", code)) } return module.Get(ip) }