broker.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package broker
  2. import (
  3. . "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
  4. "bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/loadmodule"
  5. "fmt"
  6. "github.com/gogf/gf/v2/net/ghttp"
  7. "github.com/gogf/gf/v2/net/gtrace"
  8. "github.com/gogf/gf/v2/os/gcfg"
  9. "github.com/gogf/gf/v2/os/gctx"
  10. "log"
  11. "net/http"
  12. "net/url"
  13. "sync"
  14. )
  15. type broker struct {
  16. sync.RWMutex
  17. addresses map[string]loadmodule.ProxyLoadModule
  18. outServer map[string]OutServerInterface
  19. }
  20. func InitBroker() *broker {
  21. b := new(broker)
  22. b.addresses = make(map[string]loadmodule.ProxyLoadModule)
  23. b.outServer = make(map[string]OutServerInterface)
  24. return b
  25. }
  26. type OutServerInterface interface {
  27. AutoLogin() error //自动登录
  28. RequestLogin(r *ghttp.Request) error //身份状态
  29. CheckLoginOut(r *ghttp.Request) bool //状态是否过期
  30. Filter(r *ghttp.Request) error //过滤器
  31. }
  32. // AddNode 发现注册新节点
  33. func (r *broker) AddNode(code, address string) error {
  34. r.Lock()
  35. defer r.Unlock()
  36. module, ok := r.addresses[code]
  37. if !ok {
  38. module = loadmodule.LoadProxyLoadFactory(gcfg.Instance().MustGet(gctx.New(), "system.loadProxyModule", 0).Int())
  39. r.addresses[code] = module
  40. }
  41. log.Printf("节点%s服务注册%s", code, address)
  42. return module.Add(address)
  43. }
  44. // DelNode 节点下线watcher删除节点
  45. func (r *broker) DelNode(code, address string) error {
  46. //进行删除操作
  47. r.Lock()
  48. defer r.Unlock()
  49. module, ok := r.addresses[code]
  50. if !ok {
  51. log.Printf("节点%s服务关闭异常%s", code, address)
  52. return NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("节点下线异常 module %s", code))
  53. }
  54. log.Printf("节点%s服务关闭%s", code, address)
  55. return module.Del(address)
  56. }
  57. // GetServerAddr 获取代理节点
  58. func (r *broker) GetServerAddr(code, ip string) (*url.URL, error) {
  59. r.RLock()
  60. defer r.RUnlock()
  61. module, ok := r.addresses[code]
  62. if !ok {
  63. return nil, NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("获取服务地址异常 module %s", code))
  64. }
  65. return module.Get(ip)
  66. }
  67. // RegisterOutServer 注册外部服务
  68. func (b *broker) RegisterOutServer(u *url.URL, outServer OutServerInterface) {
  69. if u != nil && outServer != nil {
  70. b.outServer[u.String()] = outServer
  71. }
  72. }
  73. // UnLoginSetErr 挂载外部服务,当未登录时,通过处罚异常尝试重新加载
  74. func UnLoginSetErr(resp *http.Response) error {
  75. if resp.StatusCode == 401 {
  76. return fmt.Errorf("未登录异常")
  77. }
  78. return nil
  79. }
  80. // GetOutSeverAutoLogin 获取代理节点
  81. func (b *broker) GetOutSeverAutoLogin(address string, r *ghttp.Request) (serverUrl *url.URL, err error) {
  82. _, span := gtrace.NewSpan(r.Context(), "GetOutSeverAutoLogin")
  83. defer span.End()
  84. serverUrl, err = url.Parse(address)
  85. if err != nil {
  86. return nil, err
  87. }
  88. server, ok := b.outServer[address]
  89. if !ok {
  90. return nil, fmt.Errorf("未知外部服务")
  91. }
  92. //装配登录身份
  93. err = server.RequestLogin(r)
  94. return
  95. }
  96. // CheckOutSeverLoginOut 获取代理节点
  97. func (b *broker) CheckOutSeverLoginOut(address string, r *ghttp.Request) bool {
  98. server, ok := b.outServer[address]
  99. if !ok {
  100. return true
  101. }
  102. return server.CheckLoginOut(r)
  103. }
  104. // OutSeverLoginIn 获取代理节点
  105. func (b *broker) OutSeverLoginIn(address string) error {
  106. server, ok := b.outServer[address]
  107. if !ok {
  108. return fmt.Errorf("未知外部服务")
  109. }
  110. return server.AutoLogin()
  111. }
  112. // OutSeverFilter 外部程序过滤器
  113. func (b *broker) OutSeverFilter(address string, r *ghttp.Request) error {
  114. server, ok := b.outServer[address]
  115. if !ok {
  116. return fmt.Errorf("未知外部服务")
  117. }
  118. return server.Filter(r)
  119. }