Browse Source

Merge branch 'dev/1.1.39.1_fuwencai' of BaseService/jyMicroservices into hotfix/v1.1.39.1

fuwencai 1 year ago
parent
commit
2051b12662

+ 1 - 1
jyBXCore/api/bxcore.go

@@ -46,7 +46,7 @@ func main() {
 	})
 	//日志记录
 	logx.SetWriter(logrusx.NewLogrusWriter())
-
+	IC.LogInit(IC.C.SearchLog, IC.MgoLog)
 	handler.RegisterHandlers(server, ctx)
 	fmt.Printf("Starting server at %s:%d...\n", IC.C.Host, IC.C.Port)
 	server.Start()

+ 8 - 1
jyBXCore/api/etc/bxcore-api.yaml

@@ -27,4 +27,11 @@ SearchMosaic:
   agencyTel: true
   budget: true
   bidAmount: true
-
+SearchLog:
+  Name: 搜索日志             # 日志名称
+  CollName: jy_search_log   # 保存的coll
+  MgoSaveCacheSize: 10000  # 缓存通道大小
+  SPSize: 3            # 数据库并发数据
+  BulkSize: 500        # 每批的数量
+  TimeAfter: 2000      # 定时保存 毫秒
+  Timeout: 10000       # 缓存通道满时,超时丢弃

+ 16 - 0
jyBXCore/api/init/logs.go

@@ -0,0 +1,16 @@
+package init
+
+import (
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"jyBXCore/api/internal/config"
+	"jyBXCore/entity"
+	"log"
+)
+
+var SearchLog *entity.SaveLogs
+
+func LogInit(c config.SaveLogConfig, mgo mongodb.MongodbSim) {
+	SearchLog = entity.NewSaveLog(c.Name, c.CollName, c.MgoSaveCacheSize, c.SPSize, c.BulkSize, c.TimeAfter, c.Timeout, mgo)
+	go SearchLog.SaveMgo()
+	log.Println("初始化日志保存")
+}

+ 10 - 0
jyBXCore/api/internal/config/config.go

@@ -18,6 +18,7 @@ type Config struct {
 	MgoLogsCount    int
 	DetailMosaicTxt string
 	SearchMosaic    map[string]bool
+	SearchLog       SaveLogConfig
 }
 
 type Db struct {
@@ -27,3 +28,12 @@ type Db struct {
 type Routes struct {
 	ExcludeRoute []string
 }
+type SaveLogConfig struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	BulkSize         int    // 每批的数量
+	TimeAfter        int    // 定时保存 毫秒
+	Timeout          int    // 超时丢弃毫秒
+}

+ 3 - 4
jyBXCore/api/internal/logic/searchListLogic.go

@@ -2,7 +2,6 @@ package logic
 
 import (
 	"context"
-	"fmt"
 	"github.com/zeromicro/go-zero/core/logx"
 	IC "jyBXCore/api/init"
 	"jyBXCore/api/internal/svc"
@@ -148,9 +147,9 @@ func (l *SearchListLogic) SearchList(req *types.SearchReq) (resp *types.CommonRe
 		"exclusionWords":     req.ExclusionWords,
 		"search_publishtime": req.PublishTime,
 	}
-	if logId := IC.MgoLog.Save("jy_search_log", data); logId == "" {
-		log.Println(fmt.Sprintf("保存搜索日志异常 %s", req.UserId))
-	}
+	go func() {
+		IC.SearchLog.SendLogs(data)
+	}()
 	return &types.CommonResp{
 		Err_code: res.ErrCode,
 		Err_msg:  res.ErrMsg,

+ 86 - 0
jyBXCore/entity/logs.go

@@ -0,0 +1,86 @@
+package entity
+
+import (
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"fmt"
+	"log"
+	"time"
+)
+
+type SaveLogs struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	MgoSaveCache     chan map[string]interface{}
+	SP               chan bool
+	MgoSave          mongodb.MongodbSim //数据保存库连接
+	BulkSize         int                // 每批的数量
+	TimeAfter        int                // 定时保存
+	Timeout          int                // 缓存通道满时 超时丢弃
+}
+
+func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
+	sl := SaveLogs{
+		Name:             name,
+		CollName:         saveColl,
+		MgoSaveCacheSize: mgoSaveCacheSize,
+		SPSize:           sPSize,
+		MgoSave:          mgoSave,
+		BulkSize:         bulkSize,
+		TimeAfter:        timeAfter,
+		Timeout:          timeout,
+	}
+	// 初始化
+	sl.SP = make(chan bool, sl.SPSize)
+	sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
+	return &sl
+}
+
+// SendLogs 往通道发送数据
+func (s *SaveLogs) SendLogs(data map[string]interface{}) {
+	timer := time.NewTicker(time.Duration(s.Timeout) * time.Millisecond)
+	select {
+	case s.MgoSaveCache <- data:
+	case <-timer.C:
+		log.Println("缓存通道已满,丢弃:", data)
+		return
+	}
+}
+
+// SaveMgo 数据存库
+func (s *SaveLogs) SaveMgo() {
+	log.Println(fmt.Sprintf("%s Save...", s.Name))
+	arr := make([]map[string]interface{}, s.BulkSize)
+	index := 0
+	for {
+		select {
+		case v := <-s.MgoSaveCache:
+			arr[index] = v
+			index++
+			if index == s.BulkSize {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr)
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		case <-time.After(time.Duration(s.TimeAfter) * time.Millisecond):
+			if index > 0 {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr[:index])
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		}
+	}
+}