Browse Source

feat:xiugai

wangchuanjin 10 months ago
parent
commit
f326c2ef19
4 changed files with 35 additions and 31 deletions
  1. 1 1
      core/proxy/middleware/errorHandler.go
  2. 29 28
      core/proxy/proxyServer.go
  3. 2 2
      core/util/reqLimit.go
  4. 3 0
      etc/config.yaml

+ 1 - 1
core/proxy/middleware/errorHandler.go

@@ -52,7 +52,7 @@ func ErrorHandler(r *ghttp.Request, routerManager *router.Manager) {
 			if gCode.Code() == int(GLOBAL_ERR_NOBALANCE) {
 				surplus = gconv.Int64(gCode.Detail())
 			}
-			db.GateWatMySql.InsertBatch("thirdparty.log", []string{"appid", "url", "header", "param", "price", "surplus", "error_code", "error_msg", "ip", "create_time"}, []interface{}{appid, router.ReqUrl, header, string(reqBody), router.Price, surplus, gCode.Code(), log_error_msg, r.GetClientIp(), gtime.Now().String()})
+			db.GateWatMySql.InsertBatch("log", []string{"appid", "url", "header", "param", "price", "surplus", "error_code", "error_msg", "ip", "create_time"}, []interface{}{appid, router.ReqUrl, header, string(reqBody), router.Price, surplus, gCode.Code(), log_error_msg, r.GetClientIp(), gtime.Now().String()})
 		}
 		//返回固定的友好信息
 		r.Response.WriteJson(map[string]interface{}{

+ 29 - 28
core/proxy/proxyServer.go

@@ -31,18 +31,15 @@ const errTryTime = 10 //错误尝试
 func InitGateWayServer(initCtx gctx.Ctx) *ghttp.Server {
 	initCtx, span := gtrace.NewSpan(initCtx, "InitGateWayServer")
 	defer span.End()
-
 	//初始化可访问路由
 	routerManager, err := router.InitRouterManager()
 	if err != nil {
 		log.WithContext(initCtx).Fatalln(err)
 	}
-
 	gateWayServer := g.Server()
 	//关闭系统自带请求日志
-	//gateWayServer.SetLogger(g.Log())
-	//gateWayServer.SetErrorLogEnabled(false)
-
+	gateWayServer.SetLogger(g.Log())
+	gateWayServer.SetErrorLogEnabled(false)
 	//注册中间件
 	//错误拦截
 	gateWayServer.Use(func(r *ghttp.Request) {
@@ -55,7 +52,6 @@ func InitGateWayServer(initCtx gctx.Ctx) *ghttp.Server {
 	})
 	//前置过滤
 	gateWayServer.Use(middleware.FilterHandler)
-
 	//注册代理
 	gateWayServer.BindHandler("POST:/*", proxyHandler) //接口代理
 	gateWayServer.BindHandler("GET:/*", proxyHandler)  //页面代理
@@ -95,29 +91,41 @@ var proxyHandler = func(r *ghttp.Request) {
 	change := func(resp *http.Response) error {
 		if resp.StatusCode != 200 {
 			return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE))
-		}
-		if gCtx.RouterRule.Price <= 0 {
+		} else if gCtx.RouterRule.Price <= 0 { //免费接口不处理
 			return nil
 		}
 		status := 0
-		b, e := ioutil.ReadAll(resp.Body)
-		if e == nil {
-			resp.Body = ioutil.NopCloser(bytes.NewReader(b))
+		isCharging := false //是否扣费
+		result := ""
+		if contentType := resp.Header.Get("content-type"); contentType == "application/json" {
+			b, e := ioutil.ReadAll(resp.Body)
+			if e == nil {
+				resp.Body = ioutil.NopCloser(bytes.NewReader(b))
+			}
+			jn, err := gjson.DecodeToJson(b)
+			if err != nil {
+				log.WithContext(r.Context()).Error("ChangeError", err)
+			} else if !jn.Contains("error_code") {
+				log.WithContext(r.Context()).Error("ChangeError:缺少error_code", string(b))
+			} else if jn.Get("error_code").Int64() == 0 {
+				isCharging = true
+				result = string(b)
+			} else {
+				status = 1
+			}
+		} else if strings.Contains(resp.Header.Get("Content-Disposition"), "attachment") && resp.ContentLength > 0 { //下载文件
+			isCharging = true
+			result = fmt.Sprint(resp.ContentLength)
 		}
-		jn, err := gjson.DecodeToJson(b)
-		if err != nil {
-			log.WithContext(r.Context()).Error("ChangeError", err)
-		} else if !jn.Contains("error_code") {
-			log.WithContext(r.Context()).Error("ChangeError:缺少error_code")
-		} else if jn.Get("error_code").Int64() == 0 {
+		if isCharging {
 			if db.GateWatMySql.ExecTx("计费处理", func(tx *sql.Tx) bool {
 				appid := r.GetQuery("appid").String()
-				updateRes := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `update thirdparty.user set amount=amount-? where appid=?`, gCtx.RouterRule.Price, appid)
+				updateRes := db.GateWatMySql.UpdateOrDeleteBySqlByTx(tx, `update user set amount=amount-? where appid=?`, gCtx.RouterRule.Price, appid)
 				if updateRes <= 0 {
 					log.WithContext(r.Context()).Error(appid, "更新计费失败")
 					return false
 				}
-				datas := db.GateWatMySql.SelectBySqlByTx(tx, `select amount from thirdparty.user where appid=?`, appid)
+				datas := db.GateWatMySql.SelectBySqlByTx(tx, `select amount from user where appid=?`, appid)
 				if datas == nil || len(*datas) == 0 {
 					log.WithContext(r.Context()).Error(appid, "没有找到该账户信息")
 					return false
@@ -129,7 +137,7 @@ var proxyHandler = func(r *ghttp.Request) {
 					log.WithContext(r.Context()).Error(appid, "余额不足")
 					return false
 				}
-				logRes1, logRes2 := db.GateWatMySql.InsertBatchByTx(tx, "thirdparty.log", []string{"appid", "url", "param", "result", "price", "surplus", "ip", "create_time"}, []interface{}{appid, gCtx.RouterRule.ReqUrl, string(reqBody), string(b), gCtx.RouterRule.Price, amount, r.GetClientIp(), gtime.Now().String()})
+				logRes1, logRes2 := db.GateWatMySql.InsertBatchByTx(tx, "log", []string{"appid", "url", "param", "result", "price", "surplus", "ip", "create_time"}, []interface{}{appid, gCtx.RouterRule.ReqUrl, string(reqBody), result, gCtx.RouterRule.Price, amount, r.GetClientIp(), gtime.Now().String()})
 				if logRes1 <= 0 || logRes2 <= 0 {
 					return false
 				}
@@ -141,27 +149,20 @@ var proxyHandler = func(r *ghttp.Request) {
 		if status == 0 {
 			return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_SERVICE))
 		} else if status == -1 {
-			r.SetError(NewErrorWithCode(GLOBAL_ERR_NOBALANCE, "余额不足"))
 			return errors.New(fmt.Sprintf("RespError_%d", GLOBAL_ERR_NOBALANCE))
-		} else if status == 1 {
-			resp.Body = ioutil.NopCloser(bytes.NewReader(b))
 		}
 		return nil
 	}
 	serverUrl, _ := url.Parse(gCtx.RouterRule.Server)
 	// 请求重试,防止某个服务中断不可用,导致接口不可用。
 	for i := 0; i < errTryTime; i++ {
-		// 代理地址存入上下文ctx中
-		//gCtx.ServerAddr = serverUrl.String()
-		//router.UpdateGContext(r, gCtx)
-		// 捕获异常,若代理出错,则进行重试
 		// 代理请求
 		proxyClient.CreateCustomProxyClient(serverUrl, errHandel, change).ServeHTTP(r.Response.ResponseWriter, r.Request)
 		// 未捕获到请求,标识请求成功
 		if !hasErr {
 			break
 		} else if i == (errTryTime - 1) {
-			r.SetError(NewErrorWithCode(GLOBAL_ERR_SERVICE, fmt.Sprintf("代理异常:%s", serverUrl.String())))
+			r.SetError(NewErrorWithCode(GLOBAL_ERR_SERVICE, fmt.Sprintf("远端服务异常:%s", serverUrl.String())))
 		}
 	}
 }

+ 2 - 2
core/util/reqLimit.go

@@ -32,10 +32,10 @@ func (r *ReqLimit) IsLimit() bool {
 }
 
 //并发数限制
-//return 1 正常 -1 被限制
+//return true 被限制 false 正常
 func (r *ReqLimit) Limit() bool {
 	if r.Size <= 0 {
-		return true
+		return false
 	}
 	return gconv.Int64(redis.LPOP("other", r.Key)) <= 0
 }

+ 3 - 0
etc/config.yaml

@@ -1,6 +1,9 @@
 server:
   # 基本配置
   address: :8077                          # 本地监听地址。默认":80"
+  dumpRouterMap: true             # 是否在Server启动时打印所有的路由列表。默认为true
+  graceful: true                  # 是否开启平滑重启特性,开启时将会在本地增加10000的本地TCP端口用于进程间通信。默认false
+  gracefulTimeout: 10             # 平滑重启父进程最大存活时间。默认2秒
 
 #数据库配置
 databases: