12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061 |
- package broker
- import (
- . "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
- "bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/loadmodule"
- "fmt"
- "github.com/gogf/gf/v2/os/gcfg"
- "github.com/gogf/gf/v2/os/gctx"
- "log"
- "net/url"
- "sync"
- )
- type broker struct {
- sync.RWMutex
- addresses map[string]loadmodule.ProxyLoadModule
- }
- func InitBroker() *broker {
- b := new(broker)
- b.addresses = make(map[string]loadmodule.ProxyLoadModule)
- return b
- }
- // AddNode 发现注册新节点
- func (r *broker) AddNode(code, address string) error {
- r.Lock()
- defer r.Unlock()
- module, ok := r.addresses[code]
- if !ok {
- module = loadmodule.LoadProxyLoadFactory(gcfg.Instance().MustGet(gctx.New(), "system.loadProxyModule", 0).Int())
- r.addresses[code] = module
- }
- log.Printf("节点%s服务注册%s", code, address)
- return module.Add(address)
- }
- // DelNode 节点下线watcher删除节点
- func (r *broker) DelNode(code, address string) error {
- //进行删除操作
- r.Lock()
- defer r.Unlock()
- module, ok := r.addresses[code]
- if !ok {
- log.Printf("节点%s服务关闭异常%s", code, address)
- return NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("节点下线异常 module %s", code))
- }
- log.Printf("节点%s服务关闭%s", code, address)
- return module.Del(address)
- }
- // GetServerAddr 获取代理节点
- func (r *broker) GetServerAddr(code, ip string) (*url.URL, error) {
- r.RLock()
- defer r.RUnlock()
- module, ok := r.addresses[code]
- if !ok {
- return nil, NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("获取服务地址异常 module %s", code))
- }
- return module.Get(ip)
- }
|