proxyServer.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package proxy
  2. import (
  3. "fmt"
  4. . "gateway/common/gatecode"
  5. "gateway/core/logs"
  6. "gateway/core/node"
  7. "gateway/core/proxy/broker"
  8. "gateway/core/proxy/middleware"
  9. "gateway/core/router"
  10. "github.com/gogf/gf/v2/frame/g"
  11. "github.com/gogf/gf/v2/net/ghttp"
  12. "github.com/gogf/gf/v2/os/gcfg"
  13. "github.com/gogf/gf/v2/os/gctx"
  14. "net/http"
  15. "net/url"
  16. )
  17. var bManager = broker.InitBroker()
  18. var routerManager *router.Manager
  19. // InitGateWayServer 初始化网关服务
  20. func InitGateWayServer() *ghttp.Server {
  21. initCtx := gctx.New()
  22. //创建节点,并持续观察节点变化
  23. watchNode := node.NewNode(gcfg.Instance().MustGet(initCtx, "system.etcdListen", nil).Strings()...)
  24. go watchNode.NewWatcher(initCtx, bManager)
  25. //初始化可访问路由
  26. var err error
  27. routerManager, err = router.InitRouterManager()
  28. if err != nil {
  29. logs.GInfo.Error(initCtx, err)
  30. }
  31. gateWayServer := g.Server()
  32. //关闭系统自带请求日志
  33. gateWayServer.SetLogger(logs.GInfo)
  34. gateWayServer.SetErrorLogEnabled(false)
  35. //注册中间件
  36. gateWayServer.Use(middleware.ErrorHandler) //错误拦截
  37. gateWayServer.Use(func(r *ghttp.Request) {
  38. r.SetError(routerManager.InfusionContext(r)) //context注入全局信息
  39. r.Middleware.Next()
  40. })
  41. gateWayServer.Use(middleware.FilterHandler) //权限过滤
  42. //注册代理
  43. gateWayServer.BindHandler("POST:/*", proxyHandler)
  44. return gateWayServer
  45. }
  46. // proxyHandler 网关代理Handler处理,完成所有校验后
  47. var proxyHandler = func(r *ghttp.Request) {
  48. if r.GetError() != nil {
  49. return
  50. }
  51. gCtx := router.GetGContext(r.GetCtx())
  52. //获取服务地址
  53. proxyAddr, err := bManager.GetServerAddr(gCtx.RouterRule.MiddleCode)
  54. if err != nil {
  55. r.SetError(err)
  56. return
  57. }
  58. //代理地址存入ctx中
  59. gCtx.ServerAddr = proxyAddr
  60. router.UpdateGContext(r, gCtx)
  61. //调用请求
  62. proxyUrl, err := url.Parse(proxyAddr)
  63. if err != nil {
  64. r.SetError(NewErrorWithCode(GATEWAY_REGISTED_URL_ERR, err.Error()))
  65. // WillDo:后续异常节点冻结
  66. return
  67. }
  68. errHandel := func(hw http.ResponseWriter, hr *http.Request, err error) {
  69. r.SetError(NewErrorWithCode(GATEWAY_PROXY_ERR, fmt.Sprintf("代理异常:%s err:%v \n", gCtx.ServerAddr, err.Error())))
  70. }
  71. CreateCustomProxyClient(proxyUrl, errHandel).ServeHTTP(r.Response.ResponseWriter, r.Request)
  72. }