broker.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. package broker
  2. import (
  3. . "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
  4. "bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/loadmodule"
  5. "fmt"
  6. "github.com/gogf/gf/v2/os/gcfg"
  7. "github.com/gogf/gf/v2/os/gctx"
  8. "log"
  9. "net/url"
  10. "sync"
  11. )
  12. type broker struct {
  13. sync.RWMutex
  14. addresses map[string]loadmodule.ProxyLoadModule
  15. }
  16. func InitBroker() *broker {
  17. b := new(broker)
  18. b.addresses = make(map[string]loadmodule.ProxyLoadModule)
  19. return b
  20. }
  21. // AddNode 发现注册新节点
  22. func (r *broker) AddNode(code, address string) error {
  23. r.Lock()
  24. defer r.Unlock()
  25. module, ok := r.addresses[code]
  26. if !ok {
  27. module = loadmodule.LoadProxyLoadFactory(gcfg.Instance().MustGet(gctx.New(), "system.loadProxyModule", 0).Int())
  28. r.addresses[code] = module
  29. }
  30. log.Printf("节点%s服务注册%s", code, address)
  31. return module.Add(address)
  32. }
  33. // DelNode 节点下线watcher删除节点
  34. func (r *broker) DelNode(code, address string) error {
  35. //进行删除操作
  36. r.Lock()
  37. defer r.Unlock()
  38. module, ok := r.addresses[code]
  39. if !ok {
  40. log.Printf("节点%s服务关闭异常%s", code, address)
  41. return NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("节点下线异常 module %s", code))
  42. }
  43. log.Printf("节点%s服务关闭%s", code, address)
  44. return module.Del(address)
  45. }
  46. // GetServerAddr 获取代理节点
  47. func (r *broker) GetServerAddr(code, ip string) (*url.URL, error) {
  48. r.RLock()
  49. defer r.RUnlock()
  50. module, ok := r.addresses[code]
  51. if !ok {
  52. return nil, NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("获取服务地址异常 module %s", code))
  53. }
  54. return module.Get(ip)
  55. }