Forráskód Böngészése

feat:日志优化

fuwencai 1 éve
szülő
commit
3a2cf997dd
5 módosított fájl, 135 hozzáadás és 4 törlés
  1. 20 0
      config/config.go
  2. 9 1
      etc/config.yaml
  3. 4 3
      handler/handler.go
  4. 4 0
      main.go
  5. 98 0
      util/logs.go

+ 20 - 0
config/config.go

@@ -5,6 +5,7 @@ import (
 	qrpc "app.yhyue.com/moapp/message/model"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gcfg"
+	"github.com/gogf/gf/v2/os/gctx"
 )
 
 type config struct {
@@ -119,8 +120,18 @@ type TaskStruct struct {
 	Type        string `json:"type"`
 	Distinguish int    `json:"distinguish"` // 0不用区分身份 1区分身份
 }
+type SaveLogConfig struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	BulkSize         int    // 每批的数量
+	TimeAfter        int    // 定时保存 毫秒
+	Timeout          int    // 超时丢弃毫秒
+}
 
 //var Config *config
+var NsqLogConfig SaveLogConfig
 
 func init() {
 	//推送配置文件
@@ -130,4 +141,13 @@ func init() {
 	g.Cfg().GetAdapter().(*gcfg.AdapterFile).SetFileName("./etc/config.yaml")
 	//任务配置文件
 	common.ReadConfig("./etc/task.json", &TaskConf)
+	NsqLogConfig = SaveLogConfig{
+		Name:             gcfg.Instance().MustGet(gctx.New(), "NsqLog.Name").String(),
+		CollName:         gcfg.Instance().MustGet(gctx.New(), "NsqLog.CollName").String(),
+		MgoSaveCacheSize: gcfg.Instance().MustGet(gctx.New(), "NsqLog.MgoSaveCacheSize").Int(),
+		SPSize:           gcfg.Instance().MustGet(gctx.New(), "NsqLog.SPSize").Int(),
+		BulkSize:         gcfg.Instance().MustGet(gctx.New(), "NsqLog.BulkSize").Int(),
+		TimeAfter:        gcfg.Instance().MustGet(gctx.New(), "NsqLog.TimeAfter").Int(),
+		Timeout:          gcfg.Instance().MustGet(gctx.New(), "NsqLog.Timeout").Int(),
+	}
 }

+ 9 - 1
etc/config.yaml

@@ -150,4 +150,12 @@ orderMonitor:
       priceLimit: true  # 是否有实付金额限制
       priceStart: 30000 # 实付金额最低  单位 分
       priceEnd: 100000  # 实付金额最高 单位 分
-taskStartTime: 1698020000 #新手任务开始时间
+taskStartTime: 1698020000 #新手任务开始时间
+NsqLog:
+  Name: nsq日志             # 日志名称
+  CollName: nsq_logs   # 保存的coll
+  MgoSaveCacheSize: 10000  # 缓存通道大小
+  SPSize: 3            # 数据库并发数据
+  BulkSize: 500        # 每批的数量
+  TimeAfter: 2000      # 定时保存 毫秒
+  Timeout: 10000       # 缓存通道满时,超时丢弃

+ 4 - 3
handler/handler.go

@@ -1,6 +1,7 @@
 package handler
 
 import (
+	"app.yhyue.com/moapp/message/util"
 	"encoding/json"
 	"fmt"
 	"time"
@@ -9,7 +10,6 @@ import (
 
 	"app.yhyue.com/moapp/jybase/common"
 	"app.yhyue.com/moapp/jybase/go-logger/logger"
-	. "app.yhyue.com/moapp/message/db"
 	"app.yhyue.com/moapp/message/model"
 	"github.com/nsqio/go-nsq"
 )
@@ -67,11 +67,12 @@ func (h *Handler) HandleMessage(m *nsq.Message) error {
 		logger.Info("无效的code值", msg)
 		return nil
 	}
-	Mgo_Log.Save("nsq_logs", map[string]interface{}{
+	data := map[string]interface{}{
 		"createtime": time.Now().Unix(),
 		"body":       msg,
 		"type":       "consumer", //producer or consumer
-	})
+	}
+	go util.NsqLog.SendLogs(data)
 	// Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
 	return nil
 }

+ 4 - 0
main.go

@@ -3,10 +3,13 @@ package main
 import (
 	"app.yhyue.com/moapp/jybase/go-logger/logger"
 	"app.yhyue.com/moapp/jybase/go-xweb/xweb"
+	"app.yhyue.com/moapp/message/config"
 	_ "app.yhyue.com/moapp/message/config"
+	"app.yhyue.com/moapp/message/db"
 	"app.yhyue.com/moapp/message/handler"
 	_ "app.yhyue.com/moapp/message/services"
 	"app.yhyue.com/moapp/message/task"
+	"app.yhyue.com/moapp/message/util"
 	"github.com/gogf/gf/v2/os/gcfg"
 	"github.com/gogf/gf/v2/os/gctx"
 	"github.com/nsqio/go-nsq"
@@ -44,6 +47,7 @@ func nsqWork() {
 func main() {
 	go nsqWork()
 	go task.SelectOrderTask()
+	util.LogInit(util.NsqLog, config.NsqLogConfig, *db.Mgo_Log)
 	mux1 := http.NewServeMux()
 	xweb.RunBase(gcfg.Instance().MustGet(gctx.New(), "webport", "").String(), mux1)
 }

+ 98 - 0
util/logs.go

@@ -0,0 +1,98 @@
+package util
+
+import (
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"app.yhyue.com/moapp/message/config"
+	"fmt"
+	"log"
+	"time"
+)
+
+var NsqLog *SaveLogs
+
+func LogInit(sl *SaveLogs, c config.SaveLogConfig, mgo mongodb.MongodbSim) {
+	sl = NewSaveLog(c.Name, c.CollName, c.MgoSaveCacheSize, c.SPSize, c.BulkSize, c.TimeAfter, c.Timeout, mgo)
+	go sl.SaveMgo()
+}
+
+type SaveLogs struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	MgoSaveCache     chan map[string]interface{}
+	SP               chan bool
+	MgoSave          mongodb.MongodbSim //数据保存库连接
+	BulkSize         int                // 每批的数量
+	TimeAfter        int                // 定时保存
+	Timeout          int                // 缓存通道满时 超时丢弃
+}
+
+func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
+	sl := SaveLogs{
+		Name:             name,
+		CollName:         saveColl,
+		MgoSaveCacheSize: mgoSaveCacheSize,
+		SPSize:           sPSize,
+		MgoSave:          mgoSave,
+		BulkSize:         bulkSize,
+		TimeAfter:        timeAfter,
+		Timeout:          timeout,
+	}
+	// 初始化
+	sl.SP = make(chan bool, sl.SPSize)
+	sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
+	return &sl
+}
+
+// SendLogs 往通道发送数据
+func (s *SaveLogs) SendLogs(data map[string]interface{}) {
+	timer := time.NewTimer(time.Duration(s.Timeout) * time.Millisecond)
+	defer timer.Stop()
+	select {
+	case s.MgoSaveCache <- data:
+	case <-timer.C:
+		log.Println("缓存通道已满,丢弃:", s.Name, data)
+		return
+	}
+}
+
+// SaveMgo 数据存库
+func (s *SaveLogs) SaveMgo() {
+	log.Println(fmt.Sprintf("%s Save...", s.Name))
+	arr := make([]map[string]interface{}, s.BulkSize)
+	index := 0
+	timer := time.NewTimer(time.Duration(s.TimeAfter) * time.Millisecond)
+	defer timer.Stop()
+	for {
+		timer.Reset(time.Duration(s.TimeAfter) * time.Millisecond)
+		select {
+		case v := <-s.MgoSaveCache:
+			arr[index] = v
+			index++
+			if index == s.BulkSize {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr)
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		case <-timer.C:
+			if index > 0 {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr[:index])
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		}
+	}
+}