proxyServer.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. package proxy
  2. import (
  3. "bytes"
  4. "database/sql"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io/ioutil"
  9. "net/http"
  10. "net/url"
  11. "strings"
  12. log "app.yhyue.com/moapp/jylog"
  13. "github.com/gogf/gf/v2/frame/g"
  14. "github.com/gogf/gf/v2/net/ghttp"
  15. "github.com/gogf/gf/v2/net/gtrace"
  16. "github.com/gogf/gf/v2/os/gctx"
  17. "github.com/gogf/gf/v2/os/gtime"
  18. "github.com/gogf/gf/v2/util/gconv"
  19. "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/db"
  20. . "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/gatecode"
  21. "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/middleware"
  22. "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/proxyClient"
  23. "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/router"
  24. )
  25. const errTryTime = 10 //错误尝试
  26. // InitGateWayServer 初始化网关服务
  27. func InitGateWayServer(initCtx gctx.Ctx) *ghttp.Server {
  28. initCtx, span := gtrace.NewSpan(initCtx, "InitGateWayServer")
  29. defer span.End()
  30. //初始化可访问路由
  31. routerManager, err := router.InitRouterManager()
  32. if err != nil {
  33. log.WithContext(initCtx).Fatalln(err)
  34. }
  35. gateWayServer := g.Server()
  36. //关闭系统自带请求日志
  37. gateWayServer.SetLogger(g.Log())
  38. gateWayServer.SetErrorLogEnabled(false)
  39. //注册中间件
  40. //错误拦截
  41. gateWayServer.Use(func(r *ghttp.Request) {
  42. middleware.ErrorHandler(r, routerManager)
  43. })
  44. //context注入全局信息
  45. gateWayServer.Use(func(r *ghttp.Request) {
  46. r.SetError(routerManager.InfusionContext(r))
  47. r.Middleware.Next()
  48. })
  49. //前置过滤
  50. gateWayServer.Use(middleware.FilterHandler)
  51. //注册代理
  52. gateWayServer.BindHandler("POST:/*", proxyHandler) //接口代理
  53. gateWayServer.BindHandler("GET:/*", proxyHandler) //页面代理
  54. return gateWayServer
  55. }
  56. // proxyHandler 网关代理Handler处理
  57. // 完成所有前置校验后,请求代理服务逻辑
  58. var proxyHandler = func(r *ghttp.Request) {
  59. _, span := gtrace.NewSpan(r.Context(), "proxyHandler")
  60. defer span.End()
  61. if r.GetError() != nil {
  62. return
  63. }
  64. gCtx := router.GetGContext(r.GetCtx())
  65. if gCtx.RouterRule.ReqLimit.Limit() {
  66. r.SetError(NewErrorWithCode(GLOBAL_ERR_OFTEN))
  67. return
  68. }
  69. defer gCtx.RouterRule.ReqLimit.UnLimit()
  70. reqBody, _ := ioutil.ReadAll(r.Body)
  71. r.Body = ioutil.NopCloser(bytes.NewReader(reqBody))
  72. // 获取请求上下文内容
  73. var hasErr bool
  74. errHandel := func(hw http.ResponseWriter, hr *http.Request, err error) {
  75. if strings.HasPrefix(err.Error(), "RespError_") {
  76. r.SetError(NewErrorWithCode(ErrCode(gconv.Int(strings.TrimPrefix(err.Error(), "RespError_")))))
  77. return
  78. }
  79. hasErr = true
  80. if err.Error() == "context canceled" {
  81. return
  82. }
  83. log.WithContext(r.Context()).Error("ErrorHandler ", err)
  84. }
  85. //计费
  86. change := func(resp *http.Response) error {
  87. if resp.StatusCode != 200 {
  88. return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE))
  89. } else if gCtx.RouterRule.IsFree { //免费接口不处理
  90. return nil
  91. }
  92. status := 0
  93. isCharging := false //是否扣费
  94. result := ""
  95. var error_code int64
  96. error_msg := ""
  97. if contentType := resp.Header.Get("content-type"); contentType == "application/json" {
  98. b, e := ioutil.ReadAll(resp.Body)
  99. jn := make(map[string]interface{})
  100. if e != nil {
  101. log.WithContext(r.Context()).Error("ChangeError", e)
  102. } else {
  103. if err := json.Unmarshal(b, &jn); err != nil {
  104. log.WithContext(r.Context()).Error("ChangeError", err)
  105. } else if _, ok := jn["error_code"]; !ok {
  106. log.WithContext(r.Context()).Error("ChangeError:缺少error_code", string(b))
  107. } else {
  108. error_code = gconv.Int64(jn["error_code"])
  109. error_msg, _ = jn["error_msg"].(string)
  110. if error_code == gconv.Int64(GLOBAL_SUCCESS) {
  111. if error_msg == "" {
  112. error_msg = "请求成功"
  113. jn["error_msg"] = error_msg
  114. b, _ = json.Marshal(jn)
  115. resp.Header.Set("content-length", fmt.Sprint(len(b)))
  116. }
  117. result = string(b)
  118. isCharging = true
  119. } else {
  120. status = 1
  121. }
  122. }
  123. resp.Body = ioutil.NopCloser(bytes.NewReader(b))
  124. }
  125. } else if strings.Contains(resp.Header.Get("Content-Disposition"), "attachment") && resp.ContentLength > 0 { //下载文件
  126. isCharging = true
  127. result = fmt.Sprint(resp.ContentLength)
  128. }
  129. appid := r.GetQuery("appid").String()
  130. if db.GateWatMySql.ExecTx("计费处理", func(tx *sql.Tx) bool {
  131. var surplus, deductMode int64
  132. if isCharging {
  133. deductMode = 2 //赠送扣除
  134. giftUpdate := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `UPDATE USER a INNER JOIN gift b ON (a.appid=? AND a.id=b.user_id and b.count>0)
  135. INNER JOIN front_proxy c ON (c.url=? AND b.front_proxy_id=c.id)
  136. SET b.count=b.count-1`, appid, gCtx.RouterRule.ReqUrl)
  137. if giftUpdate < 0 {
  138. log.WithContext(r.Context()).Error(appid, "更新赠送扣除计费失败")
  139. return false
  140. } else if giftUpdate == 0 {
  141. updateRes := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `update user set amount=amount-? where appid=?`, gCtx.RouterRule.Price, appid)
  142. if updateRes < 0 {
  143. log.WithContext(r.Context()).Error(appid, "更新金额扣除计费失败")
  144. return false
  145. }
  146. deductMode = 1
  147. }
  148. datas := db.GateWatMySql.SelectBySqlByTx(tx, `SELECT a.amount,b.count FROM USER a LEFT JOIN (
  149. SELECT a.id,b.count FROM USER a INNER JOIN gift b ON (a.appid=? AND a.id=b.user_id)
  150. INNER JOIN front_proxy c ON (c.url=? AND b.front_proxy_id=c.id)
  151. ) b ON (a.id=b.id) WHERE a.appid=?`, appid, gCtx.RouterRule.ReqUrl, appid)
  152. if datas == nil || len(*datas) == 0 {
  153. log.WithContext(r.Context()).Error(appid, "没有找到该账户信息")
  154. return false
  155. }
  156. //余额不足
  157. amount := gconv.Int64((*datas)[0]["amount"])
  158. giftCount := gconv.Int64((*datas)[0]["count"])
  159. if amount < 0 && giftCount < 0 {
  160. status = -1
  161. log.WithContext(r.Context()).Error(appid, "余额不足")
  162. return false
  163. }
  164. if deductMode == 1 {
  165. surplus = amount
  166. } else {
  167. surplus = giftCount
  168. }
  169. }
  170. logRes1, logRes2 := db.GateWatMySql.InsertBatchByTx(tx, "log", []string{"appid", "url", "param", "result", "price", "surplus", "mode", "ip", "error_code", "error_msg", "create_time"}, []interface{}{appid, gCtx.RouterRule.ReqUrl, string(reqBody), result, gCtx.RouterRule.Price, surplus, deductMode, r.GetClientIp(), error_code, error_msg, gtime.Now().String()})
  171. if logRes1 <= 0 || logRes2 <= 0 {
  172. return false
  173. }
  174. return true
  175. }) {
  176. status = 1
  177. }
  178. if status == 0 {
  179. return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE))
  180. } else if status == -1 {
  181. return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_NOBALANCE))
  182. }
  183. return nil
  184. }
  185. serverUrl, _ := url.Parse(gCtx.RouterRule.Server)
  186. // 请求重试,防止某个服务中断不可用,导致接口不可用。
  187. for i := 0; i < errTryTime; i++ {
  188. // 代理请求
  189. proxyClient.CreateCustomProxyClient(serverUrl, errHandel, change).ServeHTTP(r.Response.ResponseWriter, r.Request)
  190. // 未捕获到请求,标识请求成功
  191. if !hasErr {
  192. break
  193. } else if i == (errTryTime - 1) {
  194. r.SetError(NewErrorWithCode(GLOBAL_ERR_SERVICE, fmt.Sprintf("远端服务异常:%s", serverUrl.String())))
  195. }
  196. }
  197. }