broker.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package broker
  2. import (
  3. "fmt"
  4. . "gateway/common/gatecode"
  5. "math/rand"
  6. "sync"
  7. )
  8. type broker struct {
  9. sync.RWMutex
  10. addresses map[string][]string
  11. }
  12. func InitBroker() *broker {
  13. b := new(broker)
  14. b.addresses = make(map[string][]string)
  15. return b
  16. }
  17. // AddNode 发现注册新节点
  18. func (r *broker) AddNode(code, address string) error {
  19. for _, addr := range r.getServers(code) {
  20. if addr == address {
  21. return fmt.Errorf("%s节点%s已经存在", code, address)
  22. }
  23. }
  24. //进行添加操作
  25. r.Lock()
  26. defer r.Unlock()
  27. r.addresses[code] = append(r.addresses[code], address)
  28. return nil
  29. }
  30. // DelNode 节点下线watcher删除节点
  31. func (r *broker) DelNode(code, address string) error {
  32. var newAddress []string
  33. var curlAddr = r.getServers(code)
  34. for _, addr := range curlAddr {
  35. if addr == address {
  36. continue
  37. }
  38. newAddress = append(newAddress, addr)
  39. }
  40. if len(curlAddr) == len(newAddress) {
  41. return fmt.Errorf("%s节点未发现待删除节点%s", code, address)
  42. }
  43. //进行删除操作
  44. r.Lock()
  45. defer r.Unlock()
  46. r.addresses[code] = newAddress
  47. return nil
  48. }
  49. // getServers 根据code获取所有可用节点
  50. func (r *broker) getServers(code string) []string {
  51. r.RLock()
  52. defer r.RUnlock()
  53. addresses := make([]string, 0, len(r.addresses[code]))
  54. for _, address := range r.addresses[code] {
  55. addresses = append(addresses, address)
  56. }
  57. return addresses
  58. }
  59. // GetServerAddr 获取代理节点
  60. func (r *broker) GetServerAddr(code string) (string, error) {
  61. serverAddress := r.getServers(code)
  62. lenServer := len(serverAddress)
  63. switch lenServer {
  64. case 0:
  65. return "", NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, "未找到可用服务")
  66. case 1:
  67. return serverAddress[0], nil
  68. default:
  69. return serverAddress[rand.Intn(lenServer)], nil
  70. }
  71. }