package proxy import ( "bytes" "database/sql" "encoding/json" "errors" "fmt" "io/ioutil" "net/http" "net/url" "strings" log "app.yhyue.com/moapp/jylog" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/net/gtrace" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/gtime" "github.com/gogf/gf/v2/util/gconv" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/db" . "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/common/gatecode" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/middleware" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/proxy/proxyClient" "jygit.jydev.jianyu360.cn/dataservice/tripartite_gateway/core/router" ) const errTryTime = 10 //错误尝试 // InitGateWayServer 初始化网关服务 func InitGateWayServer(initCtx gctx.Ctx) *ghttp.Server { initCtx, span := gtrace.NewSpan(initCtx, "InitGateWayServer") defer span.End() //初始化可访问路由 routerManager, err := router.InitRouterManager() if err != nil { log.WithContext(initCtx).Fatalln(err) } gateWayServer := g.Server() //关闭系统自带请求日志 gateWayServer.SetLogger(g.Log()) gateWayServer.SetErrorLogEnabled(false) //注册中间件 //错误拦截 gateWayServer.Use(func(r *ghttp.Request) { middleware.ErrorHandler(r, routerManager) }) //context注入全局信息 gateWayServer.Use(func(r *ghttp.Request) { r.SetError(routerManager.InfusionContext(r)) r.Middleware.Next() }) //前置过滤 gateWayServer.Use(middleware.FilterHandler) //注册代理 gateWayServer.BindHandler("POST:/*", proxyHandler) //接口代理 gateWayServer.BindHandler("GET:/*", proxyHandler) //页面代理 return gateWayServer } // proxyHandler 网关代理Handler处理 // 完成所有前置校验后,请求代理服务逻辑 var proxyHandler = func(r *ghttp.Request) { _, span := gtrace.NewSpan(r.Context(), "proxyHandler") defer span.End() if r.GetError() != nil { return } gCtx := router.GetGContext(r.GetCtx()) if gCtx.RouterRule.ReqLimit.Limit() { r.SetError(NewErrorWithCode(GLOBAL_ERR_OFTEN)) return } defer gCtx.RouterRule.ReqLimit.UnLimit() reqBody, _ := ioutil.ReadAll(r.Body) r.Body = ioutil.NopCloser(bytes.NewReader(reqBody)) // 获取请求上下文内容 var hasErr bool errHandel := func(hw http.ResponseWriter, hr *http.Request, err error) { if strings.HasPrefix(err.Error(), "RespError_") { r.SetError(NewErrorWithCode(ErrCode(gconv.Int(strings.TrimPrefix(err.Error(), "RespError_"))))) return } hasErr = true if err.Error() == "context canceled" { return } log.WithContext(r.Context()).Error("ErrorHandler ", err) } //计费 change := func(resp *http.Response) error { if resp.StatusCode != 200 { return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE)) } else if gCtx.RouterRule.Price <= 0 { //免费接口不处理 return nil } status := 0 isCharging := false //是否扣费 result := "" if contentType := resp.Header.Get("content-type"); contentType == "application/json" { b, e := ioutil.ReadAll(resp.Body) jn := make(map[string]interface{}) if e != nil { log.WithContext(r.Context()).Error("ChangeError", e) } else { if err := json.Unmarshal(b, &jn); err != nil { log.WithContext(r.Context()).Error("ChangeError", err) } else if _, ok := jn["error_code"]; !ok { log.WithContext(r.Context()).Error("ChangeError:缺少error_code", string(b)) } else if gconv.Int64(jn["error_code"]) == gconv.Int64(GLOBAL_SUCCESS) { if error_msg, _ := jn["error_msg"].(string); error_msg == "" { jn["error_msg"] = "请求成功" b, _ = json.Marshal(jn) resp.Header.Set("content-length", fmt.Sprint(len(b))) } result = string(b) isCharging = true } else { status = 1 } resp.Body = ioutil.NopCloser(bytes.NewReader(b)) } } else if strings.Contains(resp.Header.Get("Content-Disposition"), "attachment") && resp.ContentLength > 0 { //下载文件 isCharging = true result = fmt.Sprint(resp.ContentLength) } if isCharging { if db.GateWatMySql.ExecTx("计费处理", func(tx *sql.Tx) bool { deductMode := 2 //赠送扣除 appid := r.GetQuery("appid").String() giftUpdate := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `UPDATE USER a INNER JOIN gift b ON (a.appid=? AND a.id=b.user_id and b.count>0) INNER JOIN front_proxy c ON (c.url=? AND b.front_proxy_id=c.id) SET b.count=b.count-1`, appid, gCtx.RouterRule.ReqUrl) if giftUpdate < 0 { log.WithContext(r.Context()).Error(appid, "更新赠送扣除计费失败") return false } else if giftUpdate == 0 { updateRes := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `update user set amount=amount-? where appid=?`, gCtx.RouterRule.Price, appid) if updateRes <= 0 { log.WithContext(r.Context()).Error(appid, "更新金额扣除计费失败") return false } deductMode = 1 } datas := db.GateWatMySql.SelectBySqlByTx(tx, `SELECT a.amount,b.count FROM USER a LEFT JOIN ( SELECT a.id,b.count FROM USER a INNER JOIN gift b ON (a.appid=? AND a.id=b.user_id) INNER JOIN front_proxy c ON (c.url=? AND b.front_proxy_id=c.id) ) b ON (a.id=b.id) WHERE a.appid=?`, appid, gCtx.RouterRule.ReqUrl, appid) if datas == nil || len(*datas) == 0 { log.WithContext(r.Context()).Error(appid, "没有找到该账户信息") return false } //余额不足 amount := gconv.Int64((*datas)[0]["amount"]) giftCount := gconv.Int64((*datas)[0]["count"]) if amount < 0 && giftCount < 0 { status = -1 log.WithContext(r.Context()).Error(appid, "余额不足") return false } var surplus int64 if deductMode == 1 { surplus = amount } else { surplus = giftCount } logRes1, logRes2 := db.GateWatMySql.InsertBatchByTx(tx, "log", []string{"appid", "url", "param", "result", "price", "surplus", "mode", "ip", "create_time"}, []interface{}{appid, gCtx.RouterRule.ReqUrl, string(reqBody), result, gCtx.RouterRule.Price, surplus, deductMode, r.GetClientIp(), gtime.Now().String()}) if logRes1 <= 0 || logRes2 <= 0 { return false } return true }) { status = 1 } } if status == 0 { return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE)) } else if status == -1 { return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_NOBALANCE)) } return nil } serverUrl, _ := url.Parse(gCtx.RouterRule.Server) // 请求重试,防止某个服务中断不可用,导致接口不可用。 for i := 0; i < errTryTime; i++ { // 代理请求 proxyClient.CreateCustomProxyClient(serverUrl, errHandel, change).ServeHTTP(r.Response.ResponseWriter, r.Request) // 未捕获到请求,标识请求成功 if !hasErr { break } else if i == (errTryTime - 1) { r.SetError(NewErrorWithCode(GLOBAL_ERR_SERVICE, fmt.Sprintf("远端服务异常:%s", serverUrl.String()))) } } }