123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- package broker
- import (
- "fmt"
- . "gateway/common/gatecode"
- "math/rand"
- "sync"
- )
- type broker struct {
- sync.RWMutex
- addresses map[string][]string
- }
- func InitBroker() *broker {
- b := new(broker)
- b.addresses = make(map[string][]string)
- return b
- }
- // AddNode 发现注册新节点
- func (r *broker) AddNode(code, address string) error {
- for _, addr := range r.getServers(code) {
- if addr == address {
- return fmt.Errorf("%s节点%s已经存在", code, address)
- }
- }
- //进行添加操作
- r.Lock()
- defer r.Unlock()
- r.addresses[code] = append(r.addresses[code], address)
- return nil
- }
- // DelNode 节点下线watcher删除节点
- func (r *broker) DelNode(code, address string) error {
- var newAddress []string
- var curlAddr = r.getServers(code)
- for _, addr := range curlAddr {
- if addr == address {
- continue
- }
- newAddress = append(newAddress, addr)
- }
- if len(curlAddr) == len(newAddress) {
- return fmt.Errorf("%s节点未发现待删除节点%s", code, address)
- }
- //进行删除操作
- r.Lock()
- defer r.Unlock()
- r.addresses[code] = newAddress
- return nil
- }
- // getServers 根据code获取所有可用节点
- func (r *broker) getServers(code string) []string {
- r.RLock()
- defer r.RUnlock()
- addresses := make([]string, 0, len(r.addresses[code]))
- for _, address := range r.addresses[code] {
- addresses = append(addresses, address)
- }
- return addresses
- }
- // GetServerAddr 获取代理节点
- func (r *broker) GetServerAddr(code string) (string, error) {
- serverAddress := r.getServers(code)
- lenServer := len(serverAddress)
- switch lenServer {
- case 0:
- return "", NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, "未找到可用服务")
- case 1:
- return serverAddress[0], nil
- default:
- return serverAddress[rand.Intn(lenServer)], nil
- }
- }
|