broker.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package broker
  2. import (
  3. . "bp.jydev.jianyu360.cn/BaseService/gateway/common/gatecode"
  4. "bp.jydev.jianyu360.cn/BaseService/gateway/core/proxy/loadmodule"
  5. "fmt"
  6. "github.com/gogf/gf/v2/net/ghttp"
  7. "github.com/gogf/gf/v2/os/gcfg"
  8. "github.com/gogf/gf/v2/os/gctx"
  9. "log"
  10. "net/http"
  11. "net/url"
  12. "sync"
  13. )
  14. type broker struct {
  15. sync.RWMutex
  16. addresses map[string]loadmodule.ProxyLoadModule
  17. outServer map[string]OutServerInterface
  18. }
  19. func InitBroker() *broker {
  20. b := new(broker)
  21. b.addresses = make(map[string]loadmodule.ProxyLoadModule)
  22. b.outServer = make(map[string]OutServerInterface)
  23. return b
  24. }
  25. type OutServerInterface interface {
  26. AutoLogin() error //自动登录
  27. RequestLogin(r *ghttp.Request) error //身份状态
  28. CheckLoginOut(r *ghttp.Request) bool //状态是否过期
  29. Filter(r *ghttp.Request) error //过滤器
  30. }
  31. // AddNode 发现注册新节点
  32. func (r *broker) AddNode(code, address string) error {
  33. r.Lock()
  34. defer r.Unlock()
  35. module, ok := r.addresses[code]
  36. if !ok {
  37. module = loadmodule.LoadProxyLoadFactory(gcfg.Instance().MustGet(gctx.New(), "system.loadProxyModule", 0).Int())
  38. r.addresses[code] = module
  39. }
  40. log.Printf("节点%s服务注册%s", code, address)
  41. return module.Add(address)
  42. }
  43. // DelNode 节点下线watcher删除节点
  44. func (r *broker) DelNode(code, address string) error {
  45. //进行删除操作
  46. r.Lock()
  47. defer r.Unlock()
  48. module, ok := r.addresses[code]
  49. if !ok {
  50. log.Printf("节点%s服务关闭异常%s", code, address)
  51. return NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("节点下线异常 module %s", code))
  52. }
  53. log.Printf("节点%s服务关闭%s", code, address)
  54. return module.Del(address)
  55. }
  56. // GetServerAddr 获取代理节点
  57. func (r *broker) GetServerAddr(code, ip string) (*url.URL, error) {
  58. r.RLock()
  59. defer r.RUnlock()
  60. module, ok := r.addresses[code]
  61. if !ok {
  62. return nil, NewErrorWithCode(GATEWAY_MODULE_UNDEFINED, fmt.Sprintf("获取服务地址异常 module %s", code))
  63. }
  64. return module.Get(ip)
  65. }
  66. // RegisterOutServer 注册外部服务
  67. func (b *broker) RegisterOutServer(u *url.URL, outServer OutServerInterface) {
  68. if u != nil && outServer != nil {
  69. b.outServer[u.String()] = outServer
  70. }
  71. }
  72. // UnLoginSetErr 挂载外部服务,当未登录时,通过处罚异常尝试重新加载
  73. func UnLoginSetErr(resp *http.Response) error {
  74. if resp.StatusCode == 401 {
  75. return fmt.Errorf("未登录异常")
  76. }
  77. return nil
  78. }
  79. // GetOutSeverAutoLogin 获取代理节点
  80. func (b *broker) GetOutSeverAutoLogin(address string, r *ghttp.Request) (serverUrl *url.URL, err error) {
  81. serverUrl, err = url.Parse(address)
  82. if err != nil {
  83. return nil, err
  84. }
  85. server, ok := b.outServer[address]
  86. if !ok {
  87. return nil, fmt.Errorf("未知外部服务")
  88. }
  89. //装配登录身份
  90. err = server.RequestLogin(r)
  91. return
  92. }
  93. // CheckOutSeverLoginOut 获取代理节点
  94. func (b *broker) CheckOutSeverLoginOut(address string, r *ghttp.Request) bool {
  95. server, ok := b.outServer[address]
  96. if !ok {
  97. return true
  98. }
  99. return server.CheckLoginOut(r)
  100. }
  101. // OutSeverLoginIn 获取代理节点
  102. func (b *broker) OutSeverLoginIn(address string) error {
  103. server, ok := b.outServer[address]
  104. if !ok {
  105. return fmt.Errorf("未知外部服务")
  106. }
  107. return server.AutoLogin()
  108. }
  109. // OutSeverFilter 外部程序过滤器
  110. func (b *broker) OutSeverFilter(address string, r *ghttp.Request) error {
  111. server, ok := b.outServer[address]
  112. if !ok {
  113. return fmt.Errorf("未知外部服务")
  114. }
  115. return server.Filter(r)
  116. }