123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- package proxy
- import (
- "bytes"
- "database/sql"
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "net/http"
- "net/url"
- "strings"
- log "app.yhyue.com/moapp/jylog"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/net/ghttp"
- "github.com/gogf/gf/v2/net/gtrace"
- "github.com/gogf/gf/v2/os/gctx"
- "github.com/gogf/gf/v2/os/gtime"
- "github.com/gogf/gf/v2/util/gconv"
- "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/db"
- . "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/gatecode"
- "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/middleware"
- "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/proxyClient"
- "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/router"
- )
- const errTryTime = 10 //错误尝试
- // InitGateWayServer 初始化网关服务
- func InitGateWayServer(initCtx gctx.Ctx) *ghttp.Server {
- initCtx, span := gtrace.NewSpan(initCtx, "InitGateWayServer")
- defer span.End()
- //初始化可访问路由
- routerManager, err := router.InitRouterManager()
- if err != nil {
- log.WithContext(initCtx).Fatalln(err)
- }
- gateWayServer := g.Server()
- //关闭系统自带请求日志
- gateWayServer.SetLogger(g.Log())
- gateWayServer.SetErrorLogEnabled(false)
- //注册中间件
- //错误拦截
- gateWayServer.Use(func(r *ghttp.Request) {
- middleware.ErrorHandler(r, routerManager)
- })
- //context注入全局信息
- gateWayServer.Use(func(r *ghttp.Request) {
- r.SetError(routerManager.InfusionContext(r))
- r.Middleware.Next()
- })
- //前置过滤
- gateWayServer.Use(middleware.FilterHandler)
- //注册代理
- gateWayServer.BindHandler("POST:/*", proxyHandler) //接口代理
- gateWayServer.BindHandler("GET:/*", proxyHandler) //页面代理
- return gateWayServer
- }
- // proxyHandler 网关代理Handler处理
- // 完成所有前置校验后,请求代理服务逻辑
- var proxyHandler = func(r *ghttp.Request) {
- _, span := gtrace.NewSpan(r.Context(), "proxyHandler")
- defer span.End()
- if r.GetError() != nil {
- return
- }
- gCtx := router.GetGContext(r.GetCtx())
- if gCtx.RouterRule.ReqLimit.Limit() {
- r.SetError(NewErrorWithCode(GLOBAL_ERR_OFTEN))
- return
- }
- defer gCtx.RouterRule.ReqLimit.UnLimit()
- reqBody, _ := ioutil.ReadAll(r.Body)
- r.Body = ioutil.NopCloser(bytes.NewReader(reqBody))
- // 获取请求上下文内容
- var hasErr bool
- errHandel := func(hw http.ResponseWriter, hr *http.Request, err error) {
- if strings.HasPrefix(err.Error(), "RespError_") {
- r.SetError(NewErrorWithCode(ErrCode(gconv.Int(strings.TrimPrefix(err.Error(), "RespError_")))))
- return
- }
- hasErr = true
- if err.Error() == "context canceled" {
- return
- }
- log.WithContext(r.Context()).Error("ErrorHandler ", err)
- }
- //计费
- change := func(resp *http.Response) error {
- if resp.StatusCode != 200 {
- return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE))
- } else if gCtx.RouterRule.Price <= 0 { //免费接口不处理
- return nil
- }
- status := 0
- isCharging := false //是否扣费
- result := ""
- if contentType := resp.Header.Get("content-type"); contentType == "application/json" {
- b, e := ioutil.ReadAll(resp.Body)
- jn := make(map[string]interface{})
- if e != nil {
- log.WithContext(r.Context()).Error("ChangeError", e)
- } else {
- if err := json.Unmarshal(b, &jn); err != nil {
- log.WithContext(r.Context()).Error("ChangeError", err)
- } else if _, ok := jn["error_code"]; !ok {
- log.WithContext(r.Context()).Error("ChangeError:缺少error_code", string(b))
- } else if gconv.Int64(jn["error_code"]) == gconv.Int64(GLOBAL_SUCCESS) {
- if error_msg, _ := jn["error_msg"].(string); error_msg == "" {
- jn["error_msg"] = "请求成功"
- b, _ = json.Marshal(jn)
- resp.Header.Set("content-length", fmt.Sprint(len(b)))
- }
- result = string(b)
- isCharging = true
- } else {
- status = 1
- }
- resp.Body = ioutil.NopCloser(bytes.NewReader(b))
- }
- } else if strings.Contains(resp.Header.Get("Content-Disposition"), "attachment") && resp.ContentLength > 0 { //下载文件
- isCharging = true
- result = fmt.Sprint(resp.ContentLength)
- }
- if isCharging {
- if db.GateWatMySql.ExecTx("计费处理", func(tx *sql.Tx) bool {
- deductMode := 2 //赠送扣除
- appid := r.GetQuery("appid").String()
- giftUpdate := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `UPDATE USER a INNER JOIN gift b ON (a.appid=? AND a.id=b.user_id and b.count>0)
- INNER JOIN front_proxy c ON (c.url=? AND b.front_proxy_id=c.id)
- SET b.count=b.count-1`, appid, gCtx.RouterRule.ReqUrl)
- if giftUpdate < 0 {
- log.WithContext(r.Context()).Error(appid, "更新赠送扣除计费失败")
- return false
- } else if giftUpdate == 0 {
- updateRes := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `update user set amount=amount-? where appid=?`, gCtx.RouterRule.Price, appid)
- if updateRes <= 0 {
- log.WithContext(r.Context()).Error(appid, "更新金额扣除计费失败")
- return false
- }
- deductMode = 1
- }
- datas := db.GateWatMySql.SelectBySqlByTx(tx, `SELECT a.amount,b.count FROM USER a LEFT JOIN (
- SELECT a.id,b.count FROM USER a INNER JOIN gift b ON (a.appid=? AND a.id=b.user_id)
- INNER JOIN front_proxy c ON (c.url=? AND b.front_proxy_id=c.id)
- ) b ON (a.id=b.id) WHERE a.appid=?`, appid, gCtx.RouterRule.ReqUrl, appid)
- if datas == nil || len(*datas) == 0 {
- log.WithContext(r.Context()).Error(appid, "没有找到该账户信息")
- return false
- }
- //余额不足
- amount := gconv.Int64((*datas)[0]["amount"])
- giftCount := gconv.Int64((*datas)[0]["count"])
- if amount < 0 && giftCount < 0 {
- status = -1
- log.WithContext(r.Context()).Error(appid, "余额不足")
- return false
- }
- var surplus int64
- if deductMode == 1 {
- surplus = amount
- } else {
- surplus = giftCount
- }
- logRes1, logRes2 := db.GateWatMySql.InsertBatchByTx(tx, "log", []string{"appid", "url", "param", "result", "price", "surplus", "mode", "ip", "create_time"}, []interface{}{appid, gCtx.RouterRule.ReqUrl, string(reqBody), result, gCtx.RouterRule.Price, surplus, deductMode, r.GetClientIp(), gtime.Now().String()})
- if logRes1 <= 0 || logRes2 <= 0 {
- return false
- }
- return true
- }) {
- status = 1
- }
- }
- if status == 0 {
- return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE))
- } else if status == -1 {
- return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_NOBALANCE))
- }
- return nil
- }
- serverUrl, _ := url.Parse(gCtx.RouterRule.Server)
- // 请求重试,防止某个服务中断不可用,导致接口不可用。
- for i := 0; i < errTryTime; i++ {
- // 代理请求
- proxyClient.CreateCustomProxyClient(serverUrl, errHandel, change).ServeHTTP(r.Response.ResponseWriter, r.Request)
- // 未捕获到请求,标识请求成功
- if !hasErr {
- break
- } else if i == (errTryTime - 1) {
- r.SetError(NewErrorWithCode(GLOBAL_ERR_SERVICE, fmt.Sprintf("远端服务异常:%s", serverUrl.String())))
- }
- }
- }
|