broker.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package broker
  2. import (
  3. . "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
  4. "bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/loadmodule"
  5. "fmt"
  6. "github.com/gogf/gf/v2/net/ghttp"
  7. "github.com/gogf/gf/v2/os/gcfg"
  8. "github.com/gogf/gf/v2/os/gctx"
  9. "log"
  10. "net/http"
  11. "net/url"
  12. "sync"
  13. )
  14. type broker struct {
  15. sync.RWMutex
  16. addresses map[string]loadmodule.ProxyLoadModule
  17. outServer map[string]OutServerInterface
  18. }
  19. func InitBroker() *broker {
  20. b := new(broker)
  21. b.addresses = make(map[string]loadmodule.ProxyLoadModule)
  22. b.outServer = make(map[string]OutServerInterface)
  23. return b
  24. }
  25. type OutServerInterface interface {
  26. AutoLogin() error //自动登录
  27. RequestLogin(r *ghttp.Request) error //身份状态
  28. CheckLoginOut(r *ghttp.Request) bool //状态是否过期
  29. }
  30. // AddNode 发现注册新节点
  31. func (r *broker) AddNode(code, address string) error {
  32. r.Lock()
  33. defer r.Unlock()
  34. module, ok := r.addresses[code]
  35. if !ok {
  36. module = loadmodule.LoadProxyLoadFactory(gcfg.Instance().MustGet(gctx.New(), "system.loadProxyModule", 0).Int())
  37. r.addresses[code] = module
  38. }
  39. log.Printf("节点%s服务注册%s", code, address)
  40. return module.Add(address)
  41. }
  42. // DelNode 节点下线watcher删除节点
  43. func (r *broker) DelNode(code, address string) error {
  44. //进行删除操作
  45. r.Lock()
  46. defer r.Unlock()
  47. module, ok := r.addresses[code]
  48. if !ok {
  49. log.Printf("节点%s服务关闭异常%s", code, address)
  50. return NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("节点下线异常 module %s", code))
  51. }
  52. log.Printf("节点%s服务关闭%s", code, address)
  53. return module.Del(address)
  54. }
  55. // GetServerAddr 获取代理节点
  56. func (r *broker) GetServerAddr(code, ip string) (*url.URL, error) {
  57. r.RLock()
  58. defer r.RUnlock()
  59. module, ok := r.addresses[code]
  60. if !ok {
  61. return nil, NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("获取服务地址异常 module %s", code))
  62. }
  63. return module.Get(ip)
  64. }
  65. // RegisterOutServer 注册外部服务
  66. func (b *broker) RegisterOutServer(u *url.URL, outServer OutServerInterface) {
  67. if u != nil && outServer != nil {
  68. b.outServer[u.String()] = outServer
  69. }
  70. }
  71. // UnLoginSetErr 挂载外部服务,当未登录时,通过处罚异常尝试重新加载
  72. func UnLoginSetErr(resp *http.Response) error {
  73. if resp.StatusCode == 401 {
  74. return fmt.Errorf("未登录异常")
  75. }
  76. return nil
  77. }
  78. // GetOutSeverAutoLogin 获取代理节点
  79. func (b *broker) GetOutSeverAutoLogin(address string, r *ghttp.Request) (serverUrl *url.URL, err error) {
  80. serverUrl, err = url.Parse(address)
  81. if err != nil {
  82. return nil, err
  83. }
  84. server, ok := b.outServer[address]
  85. if !ok {
  86. return nil, fmt.Errorf("未知外部服务")
  87. }
  88. //装配登录身份
  89. err = server.RequestLogin(r)
  90. return
  91. }
  92. // CheckOutSeverLoginOut 获取代理节点
  93. func (b *broker) CheckOutSeverLoginOut(address string, r *ghttp.Request) bool {
  94. server, ok := b.outServer[address]
  95. if !ok {
  96. return true
  97. }
  98. return server.CheckLoginOut(r)
  99. }
  100. // OutSeverLoginIn 获取代理节点
  101. func (b *broker) OutSeverLoginIn(address string) error {
  102. server, ok := b.outServer[address]
  103. if !ok {
  104. return fmt.Errorf("未知外部服务")
  105. }
  106. return server.AutoLogin()
  107. }