Przeglądaj źródła

Merge branch 'dev/1.1.39.1_fuwencai' of BaseService/jyMicroservices into hotfix/v1.1.39.1

fuwencai 1 rok temu
rodzic
commit
14060f097a

+ 1 - 1
jyBXCore/api/bxcore.go

@@ -46,7 +46,7 @@ func main() {
 	})
 	//日志记录
 	logx.SetWriter(logrusx.NewLogrusWriter())
-	IC.LogInit(IC.C.SearchLog, IC.MgoLog)
+	IC.LogInit(IC.SearchLog, IC.C.SearchLog, IC.MgoLog)
 	handler.RegisterHandlers(server, ctx)
 	fmt.Printf("Starting server at %s:%d...\n", IC.C.Host, IC.C.Port)
 	server.Start()

+ 3 - 3
jyBXCore/api/init/logs.go

@@ -9,8 +9,8 @@ import (
 
 var SearchLog *entity.SaveLogs
 
-func LogInit(c config.SaveLogConfig, mgo mongodb.MongodbSim) {
-	SearchLog = entity.NewSaveLog(c.Name, c.CollName, c.MgoSaveCacheSize, c.SPSize, c.BulkSize, c.TimeAfter, c.Timeout, mgo)
-	go SearchLog.SaveMgo()
+func LogInit(sl *entity.SaveLogs,c config.SaveLogConfig, mgo mongodb.MongodbSim) {
+	sl = entity.NewSaveLog(c.Name, c.CollName, c.MgoSaveCacheSize, c.SPSize, c.BulkSize, c.TimeAfter, c.Timeout, mgo)
+	go sl.SaveMgo()
 	log.Println("初始化日志保存")
 }

+ 1 - 3
jyBXCore/api/internal/logic/searchListLogic.go

@@ -147,9 +147,7 @@ func (l *SearchListLogic) SearchList(req *types.SearchReq) (resp *types.CommonRe
 		"exclusionWords":     req.ExclusionWords,
 		"search_publishtime": req.PublishTime,
 	}
-	go func() {
-		IC.SearchLog.SendLogs(data)
-	}()
+	go IC.SearchLog.SendLogs(data)
 	return &types.CommonResp{
 		Err_code: res.ErrCode,
 		Err_msg:  res.ErrMsg,

+ 1 - 1
jyBXCore/entity/logs.go

@@ -41,7 +41,7 @@ func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeA
 func (s *SaveLogs) SendLogs(data map[string]interface{}) {
 	select {
 	case s.MgoSaveCache <- data:
-	case <-time.After(time.Duration(s.Timeout)*time.Millisecond):
+	case <-time.After(time.Duration(s.Timeout) * time.Millisecond):
 		log.Println("缓存通道已满,丢弃:", data)
 		return
 	}

BIN
jyBXSubscribe/api/api.exe


+ 1 - 0
jyBXSubscribe/api/bxsubscribe.go

@@ -50,6 +50,7 @@ func main() {
 	handler.RegisterHandlers(server, ctx)
 	//日志记录
 	// logx.SetWriter(logrusx.NewLogrusWriter())
+	IC.LogInit(IC.SubscribeUpdateLog, IC.C.SubscribeUpdateLog, IC.MgoLog)
 	fmt.Printf("Starting server at %s:%d...\n", IC.C.Host, IC.C.Port)
 	server.Start()
 

+ 8 - 0
jyBXSubscribe/api/etc/bxsubscribe-api.yaml

@@ -16,3 +16,11 @@ Subscribe:
 AppId: 10000
 MgoLogsName: jybxsubscribe_logs
 MgoLogsCount: 500
+SubscribeUpdateLog:
+    Name: 关键词设置日志     # 日志名称
+    CollName: ovipjy_log   # 保存的coll
+    MgoSaveCacheSize: 10000  # 缓存通道大小
+    SPSize: 3            # 数据库并发数据
+    BulkSize: 500        # 每批的数量
+    TimeAfter: 2000      # 定时保存 毫秒
+    Timeout: 10000       # 缓存通道满时,超时丢弃

+ 17 - 0
jyBXSubscribe/api/init/logs.go

@@ -0,0 +1,17 @@
+package init
+
+import (
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"jyBXSubscribe/api/internal/config"
+	"jyBXSubscribe/entity"
+
+	"log"
+)
+
+var SubscribeUpdateLog *entity.SaveLogs
+
+func LogInit(sl *entity.SaveLogs, c config.SaveLogConfig, mgo mongodb.MongodbSim) {
+	sl = entity.NewSaveLog(c.Name, c.CollName, c.MgoSaveCacheSize, c.SPSize, c.BulkSize, c.TimeAfter, c.Timeout, mgo)
+	go sl.SaveMgo()
+	log.Println("初始化日志保存")
+}

+ 13 - 3
jyBXSubscribe/api/internal/config/config.go

@@ -15,9 +15,10 @@ type Config struct {
 		ServerCode string
 		Etcd       []string
 	}
-	Subscribe    zrpc.RpcClientConf
-	MgoLogsName  string
-	MgoLogsCount int
+	Subscribe          zrpc.RpcClientConf
+	MgoLogsName        string
+	MgoLogsCount       int
+	SubscribeUpdateLog SaveLogConfig
 }
 
 type Db struct {
@@ -27,3 +28,12 @@ type Db struct {
 type Routes struct {
 	ExcludeRoute []string
 }
+type SaveLogConfig struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	BulkSize         int    // 每批的数量
+	TimeAfter        int    // 定时保存 毫秒
+	Timeout          int    // 超时丢弃毫秒
+}

+ 6 - 7
jyBXSubscribe/api/internal/logic/subscribeupdatelogic.go

@@ -1,18 +1,17 @@
 package logic
 
 import (
+	"app.yhyue.com/moapp/jybase/common"
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/zeromicro/go-zero/core/logx"
 	it "jyBXSubscribe/api/init"
 	"jyBXSubscribe/api/internal/svc"
 	"jyBXSubscribe/api/internal/types"
 	. "jyBXSubscribe/entity"
 	"jyBXSubscribe/rpc/bxsubscribe"
 	"time"
-
-	"app.yhyue.com/moapp/jybase/common"
-	"github.com/zeromicro/go-zero/core/logx"
 )
 
 type SubscribeUpdateLogic struct {
@@ -29,7 +28,6 @@ func NewSubscribeUpdateLogic(ctx context.Context, svcCtx *svc.ServiceContext) *S
 	}
 }
 
-//
 func (l *SubscribeUpdateLogic) SubscribeUpdate(req *types.SubscribeUpdateReq) (resp *types.CommonResp, err error) {
 	resp = &types.CommonResp{}
 	if req.UserId == "" {
@@ -67,12 +65,13 @@ func (l *SubscribeUpdateLogic) SubscribeUpdate(req *types.SubscribeUpdateReq) (r
 		resp.Err_code, resp.Err_msg = -1, "修改失败"
 		l.Error(fmt.Sprintf("%+v", req), resp.Err_msg)
 	} else {
-		it.MgoLog.Save("ovipjy_log", map[string]interface{}{
+		saveData := map[string]interface{}{
 			"userid":     req.UserId,
 			"o_vipjy":    req,
 			"createtime": time.Now().Unix(),
 			"type":       "o_vipjy",
-		})
+		}
+		go it.SubscribeUpdateLog.SendLogs(saveData)
 		resp.Data = map[string]interface{}{
 			"status": rp.Status,
 		}
@@ -80,7 +79,7 @@ func (l *SubscribeUpdateLogic) SubscribeUpdate(req *types.SubscribeUpdateReq) (r
 	return
 }
 
-//判断关键词是否异常,
+// 判断关键词是否异常,
 func KeyWordsRepeat(aitems []map[string]interface{}) bool {
 	m := map[string]bool{} //去重的数组
 	// 录入的关键词

+ 85 - 0
jyBXSubscribe/entity/logs.go

@@ -0,0 +1,85 @@
+package entity
+
+import (
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"fmt"
+	"log"
+	"time"
+)
+
+type SaveLogs struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	MgoSaveCache     chan map[string]interface{}
+	SP               chan bool
+	MgoSave          mongodb.MongodbSim //数据保存库连接
+	BulkSize         int                // 每批的数量
+	TimeAfter        int                // 定时保存
+	Timeout          int                // 缓存通道满时 超时丢弃
+}
+
+func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
+	sl := SaveLogs{
+		Name:             name,
+		CollName:         saveColl,
+		MgoSaveCacheSize: mgoSaveCacheSize,
+		SPSize:           sPSize,
+		MgoSave:          mgoSave,
+		BulkSize:         bulkSize,
+		TimeAfter:        timeAfter,
+		Timeout:          timeout,
+	}
+	// 初始化
+	sl.SP = make(chan bool, sl.SPSize)
+	sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
+	return &sl
+}
+
+// SendLogs 往通道发送数据
+func (s *SaveLogs) SendLogs(data map[string]interface{}) {
+	select {
+	case s.MgoSaveCache <- data:
+	case <-time.After(time.Duration(s.Timeout) * time.Millisecond):
+		log.Println("缓存通道已满,丢弃:", data)
+		return
+	}
+}
+
+// SaveMgo 数据存库
+func (s *SaveLogs) SaveMgo() {
+	log.Println(fmt.Sprintf("%s Save...", s.Name))
+	arr := make([]map[string]interface{}, s.BulkSize)
+	index := 0
+	for {
+		select {
+		case v := <-s.MgoSaveCache:
+			arr[index] = v
+			index++
+			if index == s.BulkSize {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr)
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		case <-time.After(time.Duration(s.TimeAfter) * time.Millisecond):
+			if index > 0 {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr[:index])
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		}
+	}
+}