Browse Source

fix:消息保存优化

fuwencai 2 năm trước cách đây
mục cha
commit
0a26f31d52
6 tập tin đã thay đổi với 38 bổ sung38 xóa
  1. 1 0
      entity/message.go
  2. 0 15
      rpc/config.json
  3. 3 2
      rpc/etc/message.yaml
  4. 5 4
      rpc/internal/config/config.go
  5. 2 0
      rpc/message.go
  6. 27 17
      service/sendMsg.go

+ 1 - 0
entity/message.go

@@ -15,6 +15,7 @@ var EtcdCli *clientv3.Client
 var Mysql *mysql.Mysql
 var Mysql11 *sql.DB
 var SurvivalTime int
+var SaveConcurrencyChan chan int //  定义保存消息并发
 
 type Message struct {
 	Id            string    `xorm:"id" form:"id" json:"id"`

+ 0 - 15
rpc/config.json

@@ -1,15 +0,0 @@
-{
-  "mysql": {
-    "dbName": "messageCentertest",
-    "address": "am-2ze6crwd6bb0283jn167320o.ads.aliyuncs.com",
-    "userName": "jianyu",
-    "passWord": "topnet@123",
-    "maxOpenConns": 5,
-    "maxIdleConns": 5
-  },
-  "etcd": {
-    "address": "127.0.0.1:2379",
-    "jyUserCenterKey": "usercenter.rpc",
-    "jyResourceCenterKey": "resourcescenter.rpc"
-  }
-}

+ 3 - 2
rpc/etc/message.yaml

@@ -12,8 +12,8 @@ DataSource:
     Address: 192.168.3.217:4000
     UserName: root
     PassWord: =PDT49#80Z!RVv52_z
-    MaxOpenConns: 10
-    MaxIdleConns: 10
+    MaxOpenConns: 20
+    MaxIdleConns: 20
 Redis:
   Host: 192.168.3.206
   Addr: 192.168.3.206:1712
@@ -24,3 +24,4 @@ FileSystemConf:
       - 127.0.0.1:2379
     Key: message.rpc
 SurvivalTime:  86400
+SaveConcurrency: 5

+ 5 - 4
rpc/internal/config/config.go

@@ -6,10 +6,11 @@ import (
 
 type Config struct {
 	zrpc.RpcServerConf
-	DataSource   *mysqlConfig // 手动代码
-	Mysql        string
-	Redis        *RedisConfig
-	SurvivalTime int
+	DataSource      *mysqlConfig // 手动代码
+	Mysql           string
+	Redis           *RedisConfig
+	SurvivalTime    int
+	SaveConcurrency int // 消息保存并发数
 }
 
 type RedisConfig struct {

+ 2 - 0
rpc/message.go

@@ -65,6 +65,8 @@ func init() {
 		log.Println("--初始化 redis--")
 		redis.InitRedisBySize(fmt.Sprintf("%s=%s", config.ConfigJson.Redis.Modules, config.ConfigJson.Redis.Addr), 20, 30, 300)
 	}
+	// 初始化消息保存并发通道
+	entity.SaveConcurrencyChan = make(chan int, config.ConfigJson.SaveConcurrency)
 	//初始化日志信息
 	conf.MustLoad(*logFile, &logc)
 	if len(logc.Level) > 0 {

+ 27 - 17
service/sendMsg.go

@@ -11,6 +11,7 @@ import (
 	"log"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 )
 
@@ -207,13 +208,24 @@ func MsgCountZero(userId, msgType, appId string) bool {
 func MultSave(this message.MultipleSaveMsgReq) (int64, string) {
 	userIdArr := strings.Split(this.UserIds, ",")
 	userNameArr := strings.Split(this.UserNames, ",")
-	if len(userIdArr) > 0 {
-		var errCount int64
-		for k, v := range userIdArr {
-			if v == "" {
-				return errCount, "调用结束"
-			}
-			userName := userNameArr[k]
+	if len(userIdArr) == 0 {
+		return 0, "无效的用户id"
+	}
+	wg := &sync.WaitGroup{}
+	for i := 0; i < len(userIdArr); i++ {
+
+		v := userIdArr[i]
+		if v == "" {
+			continue
+		}
+		userName := userNameArr[i]
+		wg.Add(1)
+		entity.SaveConcurrencyChan <- 1
+		go func() {
+			defer func() {
+				<-entity.SaveConcurrencyChan
+				wg.Done()
+			}()
 			//消息数组
 			c := entity.Mysql.Count("conversation", map[string]interface{}{"receive_id": v, "send_id": this.SendUserId})
 			sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel,msg_log_id,show_buoy,show_content) values ("%s",'%s','%s','%s','%s','%s','%s',%d,'%s',0,'%s',0,1,%d,%d,'%s');`
@@ -234,24 +246,22 @@ func MultSave(this message.MultipleSaveMsgReq) (int64, string) {
 					return in1 > -1 && in2 > -1 && in3 > -1
 				})
 				logx.Info("执行事务是否成功:", ok)
-				if !ok {
-					errCount++
-					continue
+				if ok {
+					ok1 := MsgCountAdd(v, strconv.Itoa(int(this.MsgType)), this.Appid)
+					log.Println("存redis:", ok1)
 				}
-				ok1 := MsgCountAdd(v, strconv.Itoa(int(this.MsgType)), this.Appid)
-				log.Println("存redis:", ok1)
 			} else {
 				in := entity.Mysql.InsertBySql(sql3)
 				logx.Info("插入消息返回 in1 id:", in)
 				if in > -1 {
 					ok := MsgCountAdd(v, strconv.Itoa(int(this.MsgType)), this.Appid)
 					log.Println("存redis:", ok)
-				} else {
-					errCount++
 				}
 			}
-		}
-		return errCount, "发送成功"
+		}()
+
 	}
-	return 0, "没有要发送的用户"
+	wg.Wait()
+	return 0, ""
+
 }