package proxy import ( "bytes" "database/sql" "errors" "fmt" "io/ioutil" "net/http" "net/url" "strings" log "app.yhyue.com/moapp/jylog" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/net/gtrace" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/db" . "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/gatecode" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/middleware" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/proxyClient" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/router" ) const errTryTime = 10 //错误尝试 // InitGateWayServer 初始化网关服务 func InitGateWayServer(initCtx gctx.Ctx) *ghttp.Server { initCtx, span := gtrace.NewSpan(initCtx, "InitGateWayServer") defer span.End() //初始化可访问路由 routerManager, err := router.InitRouterManager() if err != nil { log.WithContext(initCtx).Fatalln(err) } gateWayServer := g.Server() //关闭系统自带请求日志 //gateWayServer.SetLogger(g.Log()) //gateWayServer.SetErrorLogEnabled(false) //注册中间件 //错误拦截 gateWayServer.Use(func(r *ghttp.Request) { middleware.ErrorHandler(r, routerManager) }) //context注入全局信息 gateWayServer.Use(func(r *ghttp.Request) { r.SetError(routerManager.InfusionContext(r)) r.Middleware.Next() }) //前置过滤 gateWayServer.Use(middleware.FilterHandler) //注册代理 gateWayServer.BindHandler("POST:/*", proxyHandler) //接口代理 gateWayServer.BindHandler("GET:/*", proxyHandler) //页面代理 return gateWayServer } // proxyHandler 网关代理Handler处理 // 完成所有前置校验后,请求代理服务逻辑 var proxyHandler = func(r *ghttp.Request) { _, span := gtrace.NewSpan(r.Context(), "proxyHandler") defer span.End() if r.GetError() != nil { return } gCtx := router.GetGContext(r.GetCtx()) if gCtx.RouterRule.ReqLimit.Limit() { r.SetError(NewErrorWithCode(GLOBAL_ERR_OFTEN)) return } defer gCtx.RouterRule.ReqLimit.UnLimit() reqBody, _ := ioutil.ReadAll(r.Body) r.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) // 获取请求上下文内容 var hasErr bool errHandel := func(hw http.ResponseWriter, hr *http.Request, err error) { if strings.HasPrefix(err.Error(), "RespError_") { r.SetError(NewErrorWithCode(ErrCode(gconv.Int(strings.TrimPrefix(err.Error(), "RespError_"))))) return } hasErr = true if err.Error() == "context canceled" { return } log.WithContext(r.Context()).Error("ErrorHandler ", err) } //计费 change := func(resp *http.Response) error { if resp.StatusCode != 200 { return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE)) } if gCtx.RouterRule.Price <= 0 { return nil } status := 0 b, e := ioutil.ReadAll(resp.Body) if e == nil { resp.Body = ioutil.NopCloser(bytes.NewReader(b)) } jn, err := gjson.DecodeToJson(b) if err != nil { log.WithContext(r.Context()).Error("ChangeError", err) } else if !jn.Contains("error_code") { log.WithContext(r.Context()).Error("ChangeError:缺少error_code") } else if jn.Get("error_code").Int64() == 0 { if db.GateWatMySql.ExecTx("计费处理", func(tx *sql.Tx) bool { appid := r.GetQuery("appid").String() updateRes := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `update thirdparty.user set amount=amount-? where appid=?`, gCtx.RouterRule.Price, appid) if updateRes <= 0 { log.WithContext(r.Context()).Error(appid, "更新计费失败") return false } datas := db.GateWatMySql.SelectBySqlByTx(tx, `select amount from thirdparty.user where appid=?`, appid) if datas == nil || len(*datas) == 0 { log.WithContext(r.Context()).Error(appid, "没有找到该账户信息") return false } //余额不足 amount := gconv.Int64((*datas)[0]["amount"]) if amount < 0 { status = -1 log.WithContext(r.Context()).Error(appid, "余额不足") return false } logRes1, logRes2 := db.GateWatMySql.InsertBatchByTx(tx, "thirdparty.log", []string{"appid", "url", "param", "result", "price", "surplus", "ip", "create_time"}, []interface{}{appid, gCtx.RouterRule.ReqUrl, string(reqBody), string(b), gCtx.RouterRule.Price, amount, r.GetClientIp(), gtime.Now().String()}) if logRes1 <= 0 || logRes2 <= 0 { return false } return true }) { status = 1 } } if status == 0 { return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE)) } else if status == -1 { r.SetError(NewErrorWithCode(GLOBAL_ERR_NOBALANCE, "余额不足")) return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_NOBALANCE)) } else if status == 1 { resp.Body = ioutil.NopCloser(bytes.NewReader(b)) } return nil } serverUrl, _ := url.Parse(gCtx.RouterRule.Server) // 请求重试,防止某个服务中断不可用,导致接口不可用。 for i := 0; i < errTryTime; i++ { // 代理地址存入上下文ctx中 //gCtx.ServerAddr = serverUrl.String() //router.UpdateGContext(r, gCtx) // 捕获异常,若代理出错,则进行重试 // 代理请求 proxyClient.CreateCustomProxyClient(serverUrl, errHandel, change).ServeHTTP(r.Response.ResponseWriter, r.Request) // 未捕获到请求,标识请求成功 if !hasErr { break } else if i == (errTryTime - 1) { r.SetError(NewErrorWithCode(GLOBAL_ERR_SERVICE, fmt.Sprintf("代理异常:%s", serverUrl.String()))) } } }