Ver Fonte

Merge remote-tracking branch 'origin/feature/v1.1.40' into dev_v1.1.40_wh

WH01243 há 1 ano atrás
pai
commit
940ce1f594

+ 3 - 1
jyBXCore/api/bxcore.go

@@ -10,6 +10,7 @@ import (
 	IC "jyBXCore/api/init"
 	"jyBXCore/api/internal/handler"
 	"jyBXCore/api/internal/svc"
+	"jyBXCore/entity"
 	"log"
 	"net/http"
 	"os"
@@ -46,7 +47,8 @@ func main() {
 	})
 	//日志记录
 	logx.SetWriter(logrusx.NewLogrusWriter())
-
+	IC.SearchLog = entity.NewSaveLog(IC.C.SearchLog.Name, IC.C.SearchLog.CollName, IC.C.SearchLog.MgoSaveCacheSize, IC.C.SearchLog.SPSize, IC.C.SearchLog.BulkSize, IC.C.SearchLog.TimeAfter, IC.C.SearchLog.Timeout, IC.MgoLog)
+	go IC.SearchLog.SaveMgo()
 	handler.RegisterHandlers(server, ctx)
 	fmt.Printf("Starting server at %s:%d...\n", IC.C.Host, IC.C.Port)
 	server.Start()

+ 8 - 1
jyBXCore/api/etc/bxcore-api.yaml

@@ -27,4 +27,11 @@ SearchMosaic:
   agencyTel: true
   budget: true
   bidAmount: true
-
+SearchLog:
+  Name: 搜索日志             # 日志名称
+  CollName: jy_search_log   # 保存的coll
+  MgoSaveCacheSize: 10000  # 缓存通道大小
+  SPSize: 3            # 数据库并发数据
+  BulkSize: 500        # 每批的数量
+  TimeAfter: 5000      # 定时保存 毫秒
+  Timeout: 2000       # 缓存通道满时,超时丢弃 毫秒

+ 7 - 0
jyBXCore/api/init/logs.go

@@ -0,0 +1,7 @@
+package init
+
+import (
+	"jyBXCore/entity"
+)
+
+var SearchLog *entity.SaveLogs

+ 10 - 0
jyBXCore/api/internal/config/config.go

@@ -18,6 +18,7 @@ type Config struct {
 	MgoLogsCount    int
 	DetailMosaicTxt string
 	SearchMosaic    map[string]bool
+	SearchLog       SaveLogConfig
 }
 
 type Db struct {
@@ -27,3 +28,12 @@ type Db struct {
 type Routes struct {
 	ExcludeRoute []string
 }
+type SaveLogConfig struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	BulkSize         int    // 每批的数量
+	TimeAfter        int    // 定时保存 毫秒
+	Timeout          int    // 超时丢弃毫秒
+}

+ 1 - 4
jyBXCore/api/internal/logic/searchListLogic.go

@@ -2,7 +2,6 @@ package logic
 
 import (
 	"context"
-	"fmt"
 	"github.com/zeromicro/go-zero/core/logx"
 	IC "jyBXCore/api/init"
 	"jyBXCore/api/internal/svc"
@@ -149,9 +148,7 @@ func (l *SearchListLogic) SearchList(req *types.SearchReq) (resp *types.CommonRe
 		"exclusionWords":     req.ExclusionWords,
 		"search_publishtime": req.PublishTime,
 	}
-	if logId := IC.MgoLog.Save("jy_search_log", data); logId == "" {
-		log.Println(fmt.Sprintf("保存搜索日志异常 %s", req.UserId))
-	}
+	go IC.SearchLog.SendLogs(data)
 	return &types.CommonResp{
 		Err_code: res.ErrCode,
 		Err_msg:  res.ErrMsg,

+ 89 - 0
jyBXCore/entity/logs.go

@@ -0,0 +1,89 @@
+package entity
+
+import (
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"fmt"
+	"log"
+	"time"
+)
+
+type SaveLogs struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	MgoSaveCache     chan map[string]interface{}
+	SP               chan bool
+	MgoSave          mongodb.MongodbSim //数据保存库连接
+	BulkSize         int                // 每批的数量
+	TimeAfter        int                // 定时保存
+	Timeout          int                // 缓存通道满时 超时丢弃
+}
+
+func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
+	sl := SaveLogs{
+		Name:             name,
+		CollName:         saveColl,
+		MgoSaveCacheSize: mgoSaveCacheSize,
+		SPSize:           sPSize,
+		MgoSave:          mgoSave,
+		BulkSize:         bulkSize,
+		TimeAfter:        timeAfter,
+		Timeout:          timeout,
+	}
+	// 初始化
+	sl.SP = make(chan bool, sl.SPSize)
+	sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
+	return &sl
+}
+
+// SendLogs 往通道发送数据
+func (s *SaveLogs) SendLogs(data map[string]interface{}) {
+	timer := time.NewTicker(time.Duration(s.Timeout) * time.Millisecond)
+	defer timer.Stop()
+	select {
+	case s.MgoSaveCache <- data:
+	case <-timer.C:
+		log.Println("缓存通道已满,丢弃:", data)
+		return
+	}
+}
+
+// SaveMgo 数据存库
+func (s *SaveLogs) SaveMgo() {
+	log.Println(fmt.Sprintf("%s Save...", s.Name))
+	arr := make([]map[string]interface{}, s.BulkSize)
+	index := 0
+	timer := time.NewTicker(time.Duration(s.TimeAfter) * time.Millisecond)
+	defer timer.Stop()
+	for {
+		select {
+		case v := <-s.MgoSaveCache:
+			arr[index] = v
+			index++
+			if index == s.BulkSize {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr)
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		case <-timer.C:
+			if index > 0 {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr[:index])
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		}
+	}
+}

BIN
jyBXSubscribe/api/api.exe


+ 4 - 0
jyBXSubscribe/api/bxsubscribe.go

@@ -2,6 +2,8 @@ package main
 
 import (
 	"app.yhyue.com/moapp/jybase/endless"
+	"jyBXSubscribe/entity"
+
 	// logrusx "app.yhyue.com/moapp/jylogx/logx"
 	"fmt"
 	"jyBXSubscribe/api/internal/handler"
@@ -50,6 +52,8 @@ func main() {
 	handler.RegisterHandlers(server, ctx)
 	//日志记录
 	// logx.SetWriter(logrusx.NewLogrusWriter())
+	IC.SubscribeUpdateLog = entity.NewSaveLog(IC.C.SubscribeUpdateLog.Name, IC.C.SubscribeUpdateLog.CollName, IC.C.SubscribeUpdateLog.MgoSaveCacheSize, IC.C.SubscribeUpdateLog.SPSize, IC.C.SubscribeUpdateLog.BulkSize, IC.C.SubscribeUpdateLog.TimeAfter, IC.C.SubscribeUpdateLog.Timeout, IC.MgoLog)
+	go IC.SubscribeUpdateLog.SaveMgo()
 	fmt.Printf("Starting server at %s:%d...\n", IC.C.Host, IC.C.Port)
 	server.Start()
 

+ 8 - 0
jyBXSubscribe/api/etc/bxsubscribe-api.yaml

@@ -16,3 +16,11 @@ Subscribe:
 AppId: 10000
 MgoLogsName: jybxsubscribe_logs
 MgoLogsCount: 500
+SubscribeUpdateLog:
+    Name: 关键词设置日志     # 日志名称
+    CollName: ovipjy_log   # 保存的coll
+    MgoSaveCacheSize: 10000  # 缓存通道大小
+    SPSize: 3            # 数据库并发数据
+    BulkSize: 500        # 每批的数量
+    TimeAfter: 5000      # 定时保存 毫秒
+    Timeout: 2000       # 缓存通道满时,超时丢弃

+ 7 - 0
jyBXSubscribe/api/init/logs.go

@@ -0,0 +1,7 @@
+package init
+
+import (
+	"jyBXSubscribe/entity"
+)
+
+var SubscribeUpdateLog *entity.SaveLogs

+ 13 - 3
jyBXSubscribe/api/internal/config/config.go

@@ -15,9 +15,10 @@ type Config struct {
 		ServerCode string
 		Etcd       []string
 	}
-	Subscribe    zrpc.RpcClientConf
-	MgoLogsName  string
-	MgoLogsCount int
+	Subscribe          zrpc.RpcClientConf
+	MgoLogsName        string
+	MgoLogsCount       int
+	SubscribeUpdateLog SaveLogConfig
 }
 
 type Db struct {
@@ -27,3 +28,12 @@ type Db struct {
 type Routes struct {
 	ExcludeRoute []string
 }
+type SaveLogConfig struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	BulkSize         int    // 每批的数量
+	TimeAfter        int    // 定时保存 毫秒
+	Timeout          int    // 超时丢弃毫秒
+}

+ 5 - 5
jyBXSubscribe/api/internal/logic/subscribeupdatelogic.go

@@ -1,18 +1,17 @@
 package logic
 
 import (
+	"app.yhyue.com/moapp/jybase/common"
 	"context"
 	"encoding/json"
 	"fmt"
+	"github.com/zeromicro/go-zero/core/logx"
 	it "jyBXSubscribe/api/init"
 	"jyBXSubscribe/api/internal/svc"
 	"jyBXSubscribe/api/internal/types"
 	. "jyBXSubscribe/entity"
 	"jyBXSubscribe/rpc/bxsubscribe"
 	"time"
-
-	"app.yhyue.com/moapp/jybase/common"
-	"github.com/zeromicro/go-zero/core/logx"
 )
 
 type SubscribeUpdateLogic struct {
@@ -67,12 +66,13 @@ func (l *SubscribeUpdateLogic) SubscribeUpdate(req *types.SubscribeUpdateReq) (r
 		resp.Err_code, resp.Err_msg = -1, "修改失败"
 		l.Error(fmt.Sprintf("%+v", req), resp.Err_msg)
 	} else {
-		it.MgoLog.Save("ovipjy_log", map[string]interface{}{
+		saveData := map[string]interface{}{
 			"userid":     req.UserId,
 			"o_vipjy":    req,
 			"createtime": time.Now().Unix(),
 			"type":       "o_vipjy",
-		})
+		}
+		go it.SubscribeUpdateLog.SendLogs(saveData)
 		resp.Data = map[string]interface{}{
 			"status": rp.Status,
 		}

+ 89 - 0
jyBXSubscribe/entity/logs.go

@@ -0,0 +1,89 @@
+package entity
+
+import (
+	"app.yhyue.com/moapp/jybase/mongodb"
+	"fmt"
+	"log"
+	"time"
+)
+
+type SaveLogs struct {
+	Name             string // 日志名称
+	CollName         string // 保存的coll
+	MgoSaveCacheSize int    // 缓存通道大小
+	SPSize           int    // 数据库并发数据
+	MgoSaveCache     chan map[string]interface{}
+	SP               chan bool
+	MgoSave          mongodb.MongodbSim //数据保存库连接
+	BulkSize         int                // 每批的数量
+	TimeAfter        int                // 定时保存
+	Timeout          int                // 缓存通道满时 超时丢弃
+}
+
+func NewSaveLog(name, saveColl string, mgoSaveCacheSize, sPSize, bulkSize, timeAfter, timeout int, mgoSave mongodb.MongodbSim) *SaveLogs {
+	sl := SaveLogs{
+		Name:             name,
+		CollName:         saveColl,
+		MgoSaveCacheSize: mgoSaveCacheSize,
+		SPSize:           sPSize,
+		MgoSave:          mgoSave,
+		BulkSize:         bulkSize,
+		TimeAfter:        timeAfter,
+		Timeout:          timeout,
+	}
+	// 初始化
+	sl.SP = make(chan bool, sl.SPSize)
+	sl.MgoSaveCache = make(chan map[string]interface{}, sl.MgoSaveCacheSize)
+	return &sl
+}
+
+// SendLogs 往通道发送数据
+func (s *SaveLogs) SendLogs(data map[string]interface{}) {
+	timer := time.NewTicker(time.Duration(s.Timeout) * time.Millisecond)
+	defer timer.Stop()
+	select {
+	case s.MgoSaveCache <- data:
+	case <-timer.C:
+		log.Println("缓存通道已满,丢弃:", data)
+		return
+	}
+}
+
+// SaveMgo 数据存库
+func (s *SaveLogs) SaveMgo() {
+	log.Println(fmt.Sprintf("%s Save...", s.Name))
+	arr := make([]map[string]interface{}, s.BulkSize)
+	index := 0
+	timer := time.NewTicker(time.Duration(s.TimeAfter) * time.Millisecond)
+	defer timer.Stop()
+	for {
+		select {
+		case v := <-s.MgoSaveCache:
+			arr[index] = v
+			index++
+			if index == s.BulkSize {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr)
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		case <-timer.C:
+			if index > 0 {
+				s.SP <- true
+				go func(arru []map[string]interface{}) {
+					defer func() {
+						<-s.SP
+					}()
+					s.MgoSave.SaveBulk(s.CollName, arru...)
+				}(arr[:index])
+				arr = make([]map[string]interface{}, s.BulkSize)
+				index = 0
+			}
+		}
+	}
+}

+ 4 - 4
jyBXSubscribe/go.mod

@@ -5,9 +5,9 @@ go 1.19
 require (
 	app.yhyue.com/moapp/jybase v0.0.0-20230718012114-37013054344b
 	app.yhyue.com/moapp/jylogx v0.0.0-20230522075659-ae6fbedb92bc
-	app.yhyue.com/moapp/jypkg v0.0.0-20231024062045-5c364be1561d
+	app.yhyue.com/moapp/jypkg v1.1.3
 	bp.jydev.jianyu360.cn/BaseService/gateway v1.3.4
-	bp.jydev.jianyu360.cn/BaseService/powerCheckCenter v0.0.0-20230225125145-431a4f70093a
+	bp.jydev.jianyu360.cn/BaseService/powerCheckCenter v0.0.0-20231115092908-cb4608f3a96d
 	github.com/go-sql-driver/mysql v1.7.1
 	github.com/gogf/gf/v2 v2.0.6
 	github.com/zeromicro/go-zero v1.5.3
@@ -22,8 +22,8 @@ require (
 	app.yhyue.com/moapp/jylog v0.0.0-20230522075550-05d7230ca545 // indirect
 	app.yhyue.com/moapp/message v0.0.0-20221223100203-6402e389d9ae // indirect
 	bp.jydev.jianyu360.cn/BaseService/entManageApplication v0.0.0-20230214091519-89a98c01ab0e // indirect
-	bp.jydev.jianyu360.cn/BaseService/resourceCenter v0.0.8-0.20230414052738-2cf914a80b5d // indirect
-	bp.jydev.jianyu360.cn/BaseService/userCenter v1.2.14 // indirect
+	bp.jydev.jianyu360.cn/BaseService/resourceCenter v0.0.8 // indirect
+	bp.jydev.jianyu360.cn/BaseService/userCenter v1.2.15-0.20230925060020-8e4db0f1e13e // indirect
 	github.com/BurntSushi/toml v0.4.1 // indirect
 	github.com/beorn7/perks v1.0.1 // indirect
 	github.com/cenkalti/backoff/v4 v4.2.1 // indirect

+ 8 - 0
jyBXSubscribe/go.sum

@@ -17,6 +17,8 @@ app.yhyue.com/moapp/jylogx v0.0.0-20230522075659-ae6fbedb92bc h1:QEwc+V6ZTvk3VMF
 app.yhyue.com/moapp/jylogx v0.0.0-20230522075659-ae6fbedb92bc/go.mod h1:5xAagkwCYnqG5VEHnOV2AqD6DiR169qvOjYKaHMHecU=
 app.yhyue.com/moapp/jypkg v0.0.0-20231024062045-5c364be1561d h1:h8SnO8ONZlmtx8ZSYtpw36TdBhYPy8WgLeWHyMGZj0Q=
 app.yhyue.com/moapp/jypkg v0.0.0-20231024062045-5c364be1561d/go.mod h1:76Kz6+MuxcRJRyFad9W8R4AByiQlVGzuGFzklY+2m38=
+app.yhyue.com/moapp/jypkg v1.1.3 h1:py1imaq0SQvoU77zKhUJp4CC9oOg2Cb+EAHfY9Ylek0=
+app.yhyue.com/moapp/jypkg v1.1.3/go.mod h1:sMZxJOsD3STWGY04aDhUtRD+1u5nqhQpdDdpSW3JC1Y=
 app.yhyue.com/moapp/message v0.0.0-20221223100203-6402e389d9ae h1:6rDDaz6yxvE8viTSzEBwKYOFWq14TMfuBivSazUZMz4=
 app.yhyue.com/moapp/message v0.0.0-20221223100203-6402e389d9ae/go.mod h1:b0zZHev3gmJao1Fo+2Z2KPVjsuLOJVvVxf+kCnu9WkA=
 bp.jydev.jianyu360.cn/BP/jynsq v0.0.0-20220222052708-ebc43af90698/go.mod h1:ojo/AUH9Yr1wzarEjOaNMkj1Cet/9r8IgLyba64Z52E=
@@ -27,15 +29,21 @@ bp.jydev.jianyu360.cn/BaseService/gateway v1.3.4 h1:zl5eZrKDBENVVBUiPpzyQQ0/SBdG
 bp.jydev.jianyu360.cn/BaseService/gateway v1.3.4/go.mod h1:BMLd/5wb3BIEGhnEgF9y1sJN9P5/Dw9kYsoiE9V8I9g=
 bp.jydev.jianyu360.cn/BaseService/powerCheckCenter v0.0.0-20230225125145-431a4f70093a h1:JX2jEMrbdLzXfVC/nTUvdFOkqNj5DUxkJFjl3XE1gyg=
 bp.jydev.jianyu360.cn/BaseService/powerCheckCenter v0.0.0-20230225125145-431a4f70093a/go.mod h1:5nimT8GJh46AyfeeDeyRlDQygMlO7TRM8Pwm41Gxemc=
+bp.jydev.jianyu360.cn/BaseService/powerCheckCenter v0.0.0-20231115092908-cb4608f3a96d h1:x17+SAYxlBChNWn2IS2eDWZlhZrxtgEWjlb1JF0946E=
+bp.jydev.jianyu360.cn/BaseService/powerCheckCenter v0.0.0-20231115092908-cb4608f3a96d/go.mod h1:rCCaOSWBYfQabf/yIvSVheSPtN2THnHeTl2J5/RrcuU=
 bp.jydev.jianyu360.cn/BaseService/resourceCenter v0.0.0-20220418005748-8ba5d936dd53/go.mod h1:E5lcDI3k4FESLxiAetCfWQTq8qfpy9cv0yN1oKoEO34=
 bp.jydev.jianyu360.cn/BaseService/resourceCenter v0.0.0-20220419023723-0b32d4a41751/go.mod h1:6KL5LMEku83uRbre0W/bj5kXG2I6pJGBFtktmtp51yM=
 bp.jydev.jianyu360.cn/BaseService/resourceCenter v0.0.0-20220420075831-0b59892e9982/go.mod h1:wsHNO91h37H+xE4ZNny0yd7mtpODeDJxbVYhIRMR+qw=
 bp.jydev.jianyu360.cn/BaseService/resourceCenter v0.0.8-0.20230414052738-2cf914a80b5d h1:nhrpiUuZl5GyTcAt19ZIZBDgnEfeEQm77uxfhuK7JTI=
 bp.jydev.jianyu360.cn/BaseService/resourceCenter v0.0.8-0.20230414052738-2cf914a80b5d/go.mod h1:rRiGzKG4F/fmkNxXQCxrkxNWc8yf1SmW8qWCKfGIQSM=
+bp.jydev.jianyu360.cn/BaseService/resourceCenter v0.0.8 h1:14Yxzutsej7LQe3jnN61wuRX9qjAZ4FtdWMA27ewQ3w=
+bp.jydev.jianyu360.cn/BaseService/resourceCenter v0.0.8/go.mod h1:rRiGzKG4F/fmkNxXQCxrkxNWc8yf1SmW8qWCKfGIQSM=
 bp.jydev.jianyu360.cn/BaseService/userCenter v0.0.0-20220418072311-2062bed1e700/go.mod h1:KjcrxTzM96tBc6G4B8tlLBn1lrVy5UJYF8+eTdP4xAE=
 bp.jydev.jianyu360.cn/BaseService/userCenter v0.0.0-20220421015128-4a36f3eac5c5/go.mod h1:GT0QC4aaKDuXxAvaU4G02XjCc31TU1ctqBGqxQYOfC4=
 bp.jydev.jianyu360.cn/BaseService/userCenter v1.2.14 h1:L/0RrgU+l8nMS7YO2JizszaX3lAo0gqzg2Bqmu0So0s=
 bp.jydev.jianyu360.cn/BaseService/userCenter v1.2.14/go.mod h1:03M9BWIGHy7BbGxLvjt8V9zZzEZDWhN6BuHVfVcDFbQ=
+bp.jydev.jianyu360.cn/BaseService/userCenter v1.2.15-0.20230925060020-8e4db0f1e13e h1:bncHHkJLqiDW1ZN6MVe+3bj3i0LxWgFkfqge6Gk7lB4=
+bp.jydev.jianyu360.cn/BaseService/userCenter v1.2.15-0.20230925060020-8e4db0f1e13e/go.mod h1:+6ZjaqpCr+ih1GYOh6ZhoDri9ZjiuxxSHvR7ovvhLx0=
 cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
 cloud.google.com/go v0.37.4/go.mod h1:NHPJ89PdicEuT9hdPXMROBD91xc5uRDxsMtSB16k7hw=