package broker import ( "fmt" . "gateway/common/gatecode" "math/rand" "sync" ) type broker struct { sync.RWMutex addresses map[string][]string } func InitBroker() *broker { b := new(broker) b.addresses = make(map[string][]string) return b } // AddNode 发现注册新节点 func (r *broker) AddNode(code, address string) error { for _, addr := range r.getServers(code) { if addr == address { return fmt.Errorf("%s节点%s已经存在", code, address) } } //进行添加操作 r.Lock() defer r.Unlock() r.addresses[code] = append(r.addresses[code], address) return nil } // DelNode 节点下线watcher删除节点 func (r *broker) DelNode(code, address string) error { var newAddress []string var curlAddr = r.getServers(code) for _, addr := range curlAddr { if addr == address { continue } newAddress = append(newAddress, addr) } if len(curlAddr) == len(newAddress) { return fmt.Errorf("%s节点未发现待删除节点%s", code, address) } //进行删除操作 r.Lock() defer r.Unlock() r.addresses[code] = newAddress return nil } // getServers 根据code获取所有可用节点 func (r *broker) getServers(code string) []string { r.RLock() defer r.RUnlock() addresses := make([]string, 0, len(r.addresses[code])) for _, address := range r.addresses[code] { addresses = append(addresses, address) } return addresses } // GetServerAddr 获取代理节点 func (r *broker) GetServerAddr(code string) (string, error) { serverAddress := r.getServers(code) lenServer := len(serverAddress) switch lenServer { case 0: return "", NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, "未找到可用服务") case 1: return serverAddress[0], nil default: return serverAddress[rand.Intn(lenServer)], nil } }