package proxy import ( "bytes" "database/sql" "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "net/url" "strings" log "app.yhyue.com/moapp/jylog" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/net/gtrace" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/db" . "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/gatecode" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/middleware" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/proxyClient" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/router" ) const errTryTime = 10 //错误尝试 // InitGateWayServer 初始化网关服务 func InitGateWayServer(initCtx gctx.Ctx) *ghttp.Server { initCtx, span := gtrace.NewSpan(initCtx, "InitGateWayServer") defer span.End() //初始化可访问路由 routerManager, err := router.InitRouterManager() if err != nil { log.WithContext(initCtx).Fatalln(err) } gateWayServer := g.Server() //关闭系统自带请求日志 gateWayServer.SetLogger(g.Log()) gateWayServer.SetErrorLogEnabled(false) //注册中间件 //错误拦截 gateWayServer.Use(func(r *ghttp.Request) { middleware.ErrorHandler(r, routerManager) }) //context注入全局信息 gateWayServer.Use(func(r *ghttp.Request) { r.SetError(routerManager.InfusionContext(r)) r.Middleware.Next() }) //前置过滤 gateWayServer.Use(middleware.FilterHandler) //注册代理 gateWayServer.BindHandler("POST:/*", proxyHandler) //接口代理 gateWayServer.BindHandler("GET:/*", proxyHandler) //页面代理 return gateWayServer } // proxyHandler 网关代理Handler处理 // 完成所有前置校验后,请求代理服务逻辑 var proxyHandler = func(r *ghttp.Request) { _, span := gtrace.NewSpan(r.Context(), "proxyHandler") defer span.End() if r.GetError() != nil { return } gCtx := router.GetGContext(r.GetCtx()) if gCtx.RouterRule.ReqLimit.Limit() { r.SetError(NewErrorWithCode(GLOBAL_ERR_OFTEN)) return } defer gCtx.RouterRule.ReqLimit.UnLimit() reqBody, _ := ioutil.ReadAll(r.Body) r.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) // 获取请求上下文内容 var hasErr bool errHandel := func(hw http.ResponseWriter, hr *http.Request, err error) { if strings.HasPrefix(err.Error(), "RespError_") { r.SetError(NewErrorWithCode(ErrCode(gconv.Int(strings.TrimPrefix(err.Error(), "RespError_"))))) return } hasErr = true if err.Error() == "context canceled" { return } log.WithContext(r.Context()).Error("ErrorHandler ", err) } //计费 change := func(resp *http.Response) error { if resp.StatusCode != 200 { return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE)) } else if gCtx.RouterRule.Price <= 0 { //免费接口不处理 return nil } status := 0 isCharging := false //是否扣费 result := "" if contentType := resp.Header.Get("content-type"); contentType == "application/json" { b, e := ioutil.ReadAll(resp.Body) jn := make(map[string]interface{}) if e != nil { log.WithContext(r.Context()).Error("ChangeError", e) } else { if err := json.Unmarshal(b, &jn); err != nil { log.WithContext(r.Context()).Error("ChangeError", err) } else if _, ok := jn["error_code"]; !ok { log.WithContext(r.Context()).Error("ChangeError:缺少error_code", string(b)) } else if gconv.Int64(jn["error_code"]) == gconv.Int64(GLOBAL_SUCCESS) { if error_msg, _ := jn["error_msg"].(string); error_msg == "" { jn["error_msg"] = "请求成功" b, _ = json.Marshal(jn) resp.Header.Set("content-length", fmt.Sprint(len(b))) } result = string(b) isCharging = true } else { status = 1 } resp.Body = ioutil.NopCloser(bytes.NewReader(b)) } } else if strings.Contains(resp.Header.Get("Content-Disposition"), "attachment") && resp.ContentLength > 0 { //下载文件 isCharging = true result = fmt.Sprint(resp.ContentLength) } if isCharging { if db.GateWatMySql.ExecTx("计费处理", func(tx *sql.Tx) bool { appid := r.GetQuery("appid").String() updateRes := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `update user set amount=amount-? where appid=?`, gCtx.RouterRule.Price, appid) if updateRes <= 0 { log.WithContext(r.Context()).Error(appid, "更新计费失败") return false } datas := db.GateWatMySql.SelectBySqlByTx(tx, `select amount from user where appid=?`, appid) if datas == nil || len(*datas) == 0 { log.WithContext(r.Context()).Error(appid, "没有找到该账户信息") return false } //余额不足 amount := gconv.Int64((*datas)[0]["amount"]) if amount < 0 { status = -1 log.WithContext(r.Context()).Error(appid, "余额不足") return false } logRes1, logRes2 := db.GateWatMySql.InsertBatchByTx(tx, "log", []string{"appid", "url", "param", "result", "price", "surplus", "ip", "create_time"}, []interface{}{appid, gCtx.RouterRule.ReqUrl, string(reqBody), result, gCtx.RouterRule.Price, amount, r.GetClientIp(), gtime.Now().String()}) if logRes1 <= 0 || logRes2 <= 0 { return false } return true }) { status = 1 } } if status == 0 { return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE)) } else if status == -1 { return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_NOBALANCE)) } return nil } serverUrl, _ := url.Parse(gCtx.RouterRule.Server) // 请求重试,防止某个服务中断不可用,导致接口不可用。 for i := 0; i < errTryTime; i++ { // 代理请求 proxyClient.CreateCustomProxyClient(serverUrl, errHandel, change).ServeHTTP(r.Response.ResponseWriter, r.Request) // 未捕获到请求,标识请求成功 if !hasErr { break } else if i == (errTryTime - 1) { r.SetError(NewErrorWithCode(GLOBAL_ERR_SERVICE, fmt.Sprintf("远端服务异常:%s", serverUrl.String()))) } } }