Sfoglia il codice sorgente

增加日志,消息未读数量存redis

renjiaojiao 2 anni fa
parent
commit
768b20ed72

+ 9 - 0
entity/logx.go

@@ -0,0 +1,9 @@
+package entity
+
+//日志信息
+type Logc struct {
+	Mode     string
+	Path     string
+	Level    []string
+	KeepDays int
+}

+ 6 - 0
rpc/etc/logs.yaml

@@ -0,0 +1,6 @@
+Mode: file
+Path: ./logs
+Level:
+  - info
+  - error
+KeepDays: 10

+ 8 - 4
rpc/etc/message.yaml

@@ -9,13 +9,17 @@ Timeout: 10000
 Mysql: jianyu:topnet@123@tcp(am-2ze6crwd6bb0283jn167320o.ads.aliyuncs.com:3306)/message?timeout=10s&interpolateParams=true
 DataSource:
     DbName: messagetest
-    Address: 192.168.3.109:4000
-    UserName: jianyu
-    PassWord: top@123
+    Address: 192.168.3.217:4000
+    UserName: root
+    PassWord: =PDT49#80Z!RVv52_z
     MaxOpenConns: 10
     MaxIdleConns: 10
+Redis:
+  Host: 192.168.3.206
+  Addr: 192.168.3.206:1712
+  Modules: msgCount
 FileSystemConf:
   Etcd:
     Hosts:
-      - 192.168.3.240:2379
+      - 127.0.0.1:2379
     Key: message.rpc

+ 7 - 0
rpc/internal/config/config.go

@@ -8,6 +8,13 @@ type Config struct {
 	zrpc.RpcServerConf
 	DataSource *mysqlConfig // 手动代码
 	Mysql      string
+	Redis      *RedisConfig
+}
+
+type RedisConfig struct {
+	Host    string
+	Addr    string
+	Modules string
 }
 
 type mysqlConfig struct {

+ 24 - 31
rpc/message.go

@@ -10,17 +10,20 @@ import (
 	"app.yhyue.com/moapp/MessageCenter/rpc/internal/svc"
 	"app.yhyue.com/moapp/MessageCenter/rpc/message"
 	"app.yhyue.com/moapp/jybase/mysql"
+	"app.yhyue.com/moapp/jybase/redis"
 	"flag"
 	"fmt"
 	"github.com/tal-tech/go-zero/core/conf"
+	"github.com/tal-tech/go-zero/core/logx"
 	"github.com/tal-tech/go-zero/zrpc"
 	clientv3 "go.etcd.io/etcd/client/v3"
 	"google.golang.org/grpc"
 	"log"
-	"time"
 )
 
 var configFile = flag.String("f", "etc/message.yaml", "the config file")
+var logFile = flag.String("lf", "etc/logs.yaml", "the logs file")
+var logc entity.Logc
 var EtcdCli *clientv3.Client
 var Mysql *mysql.Mysql
 
@@ -44,18 +47,6 @@ func main() {
 //创建orm引擎
 func init() {
 	conf.MustLoad(*configFile, &config.ConfigJson)
-	var err error
-
-	//连接etcd
-	entity.EtcdCli, err = clientv3.New(clientv3.Config{
-		Endpoints:   config.ConfigJson.Etcd.Hosts,
-		DialTimeout: 5 * time.Second,
-	})
-	if err != nil {
-		log.Printf("connect to etcd failed, err:%v\n", err)
-		return
-	}
-	log.Println("connect to etcd success")
 
 	log.Println("开始初始化数据库。。。。。")
 	//初始化mysql
@@ -69,24 +60,26 @@ func init() {
 	}
 	entity.Mysql.Init()
 
-	//连接mysql
-	//log.Println(config.ConfigJson.DataSource)
-	//entity.Engine, err = xorm.NewEngine("mysql", config.ConfigJson.Mysql)
-	//log.Println(err)
-	//entity.Engine.ShowSQL(true)
-	//if err != nil {
-	//	log.Fatal("数据库连接失败:", err)
-	//}
-	//fmt.Println(config.ConfigJson.Mysql + "链接成功")
+	//初始化 redis
+	if config.ConfigJson.Redis.Addr != "" {
+		if config.ConfigJson.Redis.Modules == "" {
+			config.ConfigJson.Redis.Modules = "other"
+		}
+		log.Println("--初始化 redis--")
+		redis.InitRedisBySize(fmt.Sprintf("%s=%s", config.ConfigJson.Redis.Modules, config.ConfigJson.Redis.Addr), 20, 30, 300)
+	}
 
-	/*entity.Mysql11, err = sql.Open("mysql", config.ConfigJson.Mysql)
-	if err != nil {
-		panic(err.Error())
+	//初始化日志信息
+	conf.MustLoad(*logFile, &logc)
+	if len(logc.Level) > 0 {
+		for _, v := range logc.Level {
+			logx.MustSetup(logx.LogConf{
+				Mode:     logc.Mode,
+				Path:     logc.Path,
+				Level:    v,
+				KeepDays: logc.KeepDays,
+			})
+			logx.Info(v, "--日志记录")
+		}
 	}
-	// 设置最大打开的连接数,默认值为0,表示不限制。
-	entity.Mysql11.SetMaxOpenConns(20)
-	// 设置最大闲置的连接数
-	entity.Mysql11.SetMaxIdleConns(20)
-	// 设置连接的最大生命周期,默认连接总是可重用。
-	entity.Mysql11.SetConnMaxLifetime(time.Hour)*/
 }

+ 3 - 3
rpc/test/send_test.go

@@ -16,7 +16,7 @@ import (
 
 func Test_SendMsg(t *testing.T) {
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
-	std := messageclient.NewMessage(zrpc.MustNewClient(zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"192.168.3.240:2379"}, Key: "message.rpc"}}))
+	std := messageclient.NewMessage(zrpc.MustNewClient(zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"127.0.0.1:2379"}, Key: "message.rpc"}}))
 	req := &messageclient.SendMsgRequest{
 		Appid:         "10000",
 		ReceiveUserId: "6042120adca8410f1ef2ec84",
@@ -105,7 +105,7 @@ func Test_SaveMsg(t *testing.T) {
 
 	req := &messageclient.MultipleSaveMsgReq{
 		Appid:      "10000",
-		MsgType:    4,
+		MsgType:    2,
 		Title:      "11",
 		Content:    "22",
 		Link:       "",
@@ -113,7 +113,7 @@ func Test_SaveMsg(t *testing.T) {
 		SendUserId: "111",
 		SendName:   "222",
 		UserNames:  ",",
-		UserIds:    "111,2222",
+		UserIds:    "5fa3bb6d059e75bcdf8dab6a",
 	}
 	res, err := std.MultipleSaveMsg(ctx, req)
 	log.Println("err ", err, res)

+ 6 - 5
service/messageService.go

@@ -6,6 +6,7 @@ import (
 	"app.yhyue.com/moapp/MessageCenter/util"
 	qutil "app.yhyue.com/moapp/jybase/common"
 	"errors"
+	"github.com/tal-tech/go-zero/core/logx"
 	"log"
 	"strconv"
 )
@@ -16,7 +17,7 @@ type MessageService struct {
 // 修改消息阅读状态
 func (service *MessageService) ChangeReadStatus(data *message.ChangeReadStatusRequest) (int64, string) {
 	msg := entity.Mysql.FindOne("message", map[string]interface{}{"id": data.Id, "isdel": 1, "appid": data.Appid}, "", "")
-	log.Println("查询到消息:", msg)
+	//log.Println("查询到消息:", msg)
 	if msg == nil {
 		return 0, "该消息不存在"
 	}
@@ -24,7 +25,7 @@ func (service *MessageService) ChangeReadStatus(data *message.ChangeReadStatusRe
 	if !b {
 		return 0, "修改消息阅读状态失败"
 	}
-	EtcdCountMinusOne(qutil.ObjToString((*msg)["receive_userid"]), strconv.Itoa(int((*msg)["msg_type"].(int64))))
+	MsgCountMinusOne(qutil.ObjToString((*msg)["receive_userid"]), strconv.Itoa(int((*msg)["msg_type"].(int64))))
 	return 1, "修改消息阅读状态成功"
 }
 
@@ -56,7 +57,7 @@ func (service *MessageService) CountUnread(userId string, appId string) (int64,
 		"appid":          appId,
 	}
 	count := entity.Mysql.Count("message", query)
-	log.Println(count)
+	logx.Info("未读消息数量:", count)
 	return 1, "查询未读消息成功", count
 }
 
@@ -93,7 +94,7 @@ func (service *MessageService) LastMessage(userId string, appId string, msgType
 			CiteId:        qutil.Int64All((*lastMsg)["cite_id"]),
 			Content:       qutil.ObjToString((*lastMsg)["content"]),
 			IsRead:        qutil.Int64All((*lastMsg)["isRead"]),
-			MsgLogId: qutil.Int64All((*lastMsg)["msg_log_id"]),
+			MsgLogId:      qutil.Int64All((*lastMsg)["msg_log_id"]),
 		}
 		return &msg, nil
 	} else {
@@ -156,6 +157,6 @@ func (service *MessageService) UpdateMessageReadStatus(msgType int, receiveUseri
 	if !b {
 		return 0, errors.New("修改消息已读出错")
 	}
-	EtcdSetCountZero(receiveUserid, strconv.Itoa(msgType))
+	MsgCountZero(receiveUserid, strconv.Itoa(msgType))
 	return 1, nil
 }

+ 34 - 184
service/sendMsg.go

@@ -1,44 +1,41 @@
 package service
 
 import (
-	"context"
+	"app.yhyue.com/moapp/jybase/redis"
 	"database/sql"
-	"encoding/json"
 	"fmt"
+	"github.com/tal-tech/go-zero/core/logx"
 	"log"
 	"strconv"
 	"strings"
 	"time"
 
-	"app.yhyue.com/moapp/MessageCenter/util"
-	"go.etcd.io/etcd/client/v3/concurrency"
-
 	"app.yhyue.com/moapp/MessageCenter/entity"
 	"app.yhyue.com/moapp/MessageCenter/rpc/message"
+	"app.yhyue.com/moapp/MessageCenter/util"
 )
 
 // 类型的顺序
 const order = "1,4"
-const EtcdCount = "Count.%s.%s" //etcd 消息未读数量 Count.用户id.消息类型=数量
+const MsgCountKey = "count_%s_%s" //redis 消息未读数量 Count.用户id.消息类型=数量
+const redisModule = "msgCount"
+
+/*var (
+	UserLockMap = map[string]*sync.Mutex{}
+	//MainLock    = sync.Mutex{}
+)*/
 
 func SendMsg(this message.SendMsgRequest) (int64, string) {
-	//orm := entity.Engine.NewSession()
-	//defer orm.Close()
-	//err := orm.Begin()
-	//fmt.Println(err)
-	//count := entity.Mysql11.Query("conversation", map[string]interface{}{"receive_id": this.ReceiveUserId, "send_id": this.SendUserId})
 
 	r, err := entity.Mysql11.Query("select count(*) as c from conversation where receive_id = ? and send_id = ? ", this.ReceiveUserId, this.SendUserId)
 	c := 0
-	log.Println("查询结果", r)
 	for r.Next() {
 		err := r.Scan(&c)
 		if err != nil {
 			panic(err.Error())
 		}
 	}
-
-	log.Println("查询数量:", c)
+	logx.Info("查询数量:", c)
 
 	sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel)
 		values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);`
@@ -68,7 +65,7 @@ func SendMsg(this message.SendMsgRequest) (int64, string) {
 	}
 	_, err = entity.Mysql11.Exec(sql3)
 	if err == nil {
-		EtcdCountAdd(this.ReceiveUserId, strconv.Itoa(int(this.MsgType)))
+		MsgCountAdd(this.ReceiveUserId, strconv.Itoa(int(this.MsgType)))
 		return 1, "消息发送成功"
 	}
 	return 0, "消息发送失败"
@@ -94,12 +91,8 @@ func FindUserMsg(this message.FindUserMsgReq) message.FindUserMsgRes {
 	//count, err = orm.Table("message").Where("((receive_userid = ? and send_userid = ?) or (receive_userid = ? and send_userid = ?)) and isdel = ? and appid = ?"+q, this.UserId, this.ReceiveUserId, this.ReceiveUserId, this.UserId, 1, this.Appid).Count()
 	data := message.FindUserMsgRes{}
 	if count > 0 {
-		/*err = orm.Table("message").Select("*").Where("((receive_userid = ? and send_userid = ?) or (receive_userid = ? and send_userid = ?)) and isdel = ? and appid = ?"+q, this.UserId, this.ReceiveUserId, this.ReceiveUserId, this.UserId, 1, this.Appid).
-		OrderBy("createtime desc").
-		Limit(int(this.PageSize), (int(this.OffSet)-1)*int(this.PageSize)).
-		Find(&messages)*/
 		res := entity.Mysql.Find("message", cquery, "", "createtime desc", (int(this.OffSet)-1)*int(this.PageSize), int(this.PageSize))
-		log.Println("数据:", res)
+		//log.Println("数据:", res)
 		if res != nil && len(*res) > 0 {
 			for _, v := range *res {
 				_id := util.Int64All(v["id"])
@@ -136,8 +129,6 @@ func FindUserMsg(this message.FindUserMsgReq) message.FindUserMsgRes {
 
 // 指定分类未读消息合计
 func ClassCountUnread(msgType int, userId string, appId string) (int64, string, int64) {
-	//orm := entity.Engine
-	//count, err := orm.Table("message").Where("msg_type=? and receive_userid=? and isdel=1 and appid=? and isRead=0", msgType, userId, appId).Count()
 	query := map[string]interface{}{
 		"msg_type":       msgType,
 		"receive_userid": userId,
@@ -149,162 +140,29 @@ func ClassCountUnread(msgType int, userId string, appId string) (int64, string,
 	return 1, "查询指定分类未读消息成功", count
 }
 
-// etcd数量加1
-func EtcdCountAdd(userId, msgType string) {
-	s1, err := concurrency.NewSession(entity.EtcdCli)
-	if err != nil {
-		log.Println(err)
-	}
-	defer s1.Close()
-	keyString := fmt.Sprintf(EtcdCount, userId, msgType)
-	m1 := concurrency.NewMutex(s1, keyString)
-	// 会话s1获取锁
-	if err := m1.Lock(context.TODO()); err != nil {
-		log.Println("EtcdCountAdd获取锁失败", err)
-	}
-	// 操作数量
-	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
-	resp, err := s1.Client().Get(ctx, keyString)
-	cancel()
-	if err != nil {
-		fmt.Printf("get from etcd failed, err:%v\n", err)
-
-		// 释放锁
-		if err := m1.Unlock(context.TODO()); err != nil {
-			log.Println("EtcdCountAdd释放锁失败", err)
-		}
-		return
-	}
-	var count int
-	for _, ev := range resp.Kvs {
-		if ev.Value != nil {
-			err := json.Unmarshal([]byte(ev.Value), &count)
-			if err != nil {
-				log.Println("etcd get err:", err)
-			} else {
-				break
-			}
-		}
-
-	}
-	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
-	_, err = s1.Client().Put(ctx, keyString, strconv.Itoa(count+1))
-	cancel()
-	if err != nil {
-		fmt.Printf("put to etcd failed, err:%v\n", err)
-		// 释放锁
-		if err := m1.Unlock(context.TODO()); err != nil {
-			log.Println("EtcdCountAdd释放锁失败2", err)
-		}
-		return
-	}
-
-	// 释放锁
-	if err := m1.Unlock(context.TODO()); err != nil {
-		log.Println("EtcdCountAdd释放锁失败3", err)
-	}
+// MsgCountAdd 消息未读数量加1
+func MsgCountAdd(userId, msgType string) bool {
+	keyString := fmt.Sprintf(MsgCountKey, userId, msgType)
+	in := redis.Incr(redisModule, keyString)
+	return in > 0
 }
 
-//单条消息,-1
-func EtcdCountMinusOne(userId, msgType string) {
-	s1, err := concurrency.NewSession(entity.EtcdCli)
-	if err != nil {
-		log.Println(err)
-	}
-	defer s1.Close()
-	keyString := fmt.Sprintf(EtcdCount, userId, msgType)
-	m1 := concurrency.NewMutex(s1, keyString)
-	// 会话s1获取锁
-	if err := m1.Lock(context.TODO()); err != nil {
-		log.Println("EtcdCountMinusOne获取锁失败", err)
-	}
-	// 操作数量
-	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
-	resp, err := s1.Client().Get(ctx, keyString)
-	cancel()
-	if err != nil {
-		fmt.Printf("get from etcd failed, err:%v\n", err)
-
-		// 释放锁
-		if err := m1.Unlock(context.TODO()); err != nil {
-			log.Println("EtcdCountMinusOne释放锁失败", err)
-		}
-		return
-	}
-	var count int
-	for _, ev := range resp.Kvs {
-		if ev.Value != nil {
-			err := json.Unmarshal([]byte(ev.Value), &count)
-			if err != nil {
-				log.Println("etcd get err:", err)
-			} else {
-				break
-			}
-		}
-
-	}
-	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
-	var fin string
-	if count > 0 {
-		fin = strconv.Itoa(count - 1)
-	} else {
-		fin = "0"
-	}
-	_, err = s1.Client().Put(ctx, keyString, fin)
-	cancel()
-	if err != nil {
-		fmt.Printf("put to etcd failed, err:%v\n", err)
-		// 释放锁
-		if err := m1.Unlock(context.TODO()); err != nil {
-			log.Println("EtcdCountMinusOne释放锁失败2", err)
-		}
-		return
-	}
-
-	// 释放锁
-	if err := m1.Unlock(context.TODO()); err != nil {
-		log.Println("EtcdCountMinusOne释放锁失败3", err)
-	}
+// MsgCountMinusOne 根据消息类型未读消息数量减1
+func MsgCountMinusOne(userId, msgType string) bool {
+	keyString := fmt.Sprintf(MsgCountKey, userId, msgType)
+	in := redis.Decrby(redisModule, keyString, 1)
+	return in > 0
 }
 
-// 消息类别置0
-func EtcdSetCountZero(userId, msgType string) {
-	log.Println(" 消息类别置0", userId, msgType)
-	s1, err := concurrency.NewSession(entity.EtcdCli)
-	if err != nil {
-		log.Println(err)
-	}
-	defer s1.Close()
-	keyString := fmt.Sprintf(EtcdCount, userId, msgType)
-	m1 := concurrency.NewMutex(s1, keyString)
-	// 会话s1获取锁
-	if err := m1.Lock(context.TODO()); err != nil {
-		log.Println("EtcdSetCountZero获取锁失败", err)
-	}
-	// 操作数量
-
-	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
-	_, err = s1.Client().Put(ctx, keyString, "0")
-	cancel()
-	if err != nil {
-		fmt.Printf("put to etcd failed, err:%v\n", err)
-		// 释放锁
-		if err := m1.Unlock(context.TODO()); err != nil {
-			log.Println("EtcdSetCountZero释放锁失败", err)
-		}
-		return
-	}
-
-	// 释放锁
-	if err := m1.Unlock(context.TODO()); err != nil {
-		log.Println("EtcdSetCountZero释放锁失败", err)
-	}
+// MsgCountZero 把该消息类型未读数量置0
+func MsgCountZero(userId, msgType string) bool {
+	keyString := fmt.Sprintf(MsgCountKey, userId, msgType)
+	return redis.Put(redisModule, keyString, 0, -1)
 }
 
 func MultSave(this message.MultipleSaveMsgReq) (int64, string) {
 	userIdArr := strings.Split(this.UserIds, ",")
 	userNameArr := strings.Split(this.UserNames, ",")
-	log.Println("参数:", len(userIdArr), len(userNameArr))
 	if len(userIdArr) > 0 {
 		var errCount int64
 		for k, v := range userIdArr {
@@ -313,45 +171,37 @@ func MultSave(this message.MultipleSaveMsgReq) (int64, string) {
 			}
 			userName := userNameArr[k]
 			//消息数组
-			bT := time.Now()
 			c := entity.Mysql.Count("conversation", map[string]interface{}{"receive_id": v, "send_id": this.SendUserId})
-			// log.Println("count", c)
-			//m := entity.Mysql.SelectBySql("select count(id) from conversation where receive_id=? and send_id=?", v, this.SendUserId)
 			sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel,msg_log_id) values ("%s",'%s','%s','%s','%s','%s','%s',%d,'%s',0,'%s',0,1,%d);`
 			sql3 = fmt.Sprintf(sql3, this.Appid, v, userName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"), this.MsgLogId)
-			log.Println("插入消息表sql3:", sql3)
 			if c <= 0 {
 				sql1 := `INSERT INTO conversation(appid,secret_key,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
 				sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, v, userName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
-				log.Println("插入会话表sql1:", sql1)
 				ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
 					//插入会话表
-					//开始时间
 					in1 := entity.Mysql.InsertBySqlByTx(tx, sql1)
 					sql2 := `INSERT INTO conversation(appid,secret_key,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
 					sql2 = fmt.Sprintf(sql2, this.Appid, v, this.SendUserId, this.SendName, v, userName, time.Now().Format("2006-01-02 15:04:05"))
-					log.Println("插入会话表sql2:", sql2)
 					in2 := entity.Mysql.InsertBySqlByTx(tx, sql2)
 					//插入消息表
 					in3 := entity.Mysql.InsertBySqlByTx(tx, sql3)
-					log.Println("插入消息返回 in3 id:", in3)
-					log.Println("存储耗时:", time.Since(bT))
-					log.Println(in1, in2, in3)
+					logx.Info(in1, in2, in3)
 
 					return in1 > -1 && in2 > -1 && in3 > -1
 				})
-				log.Println("执行事务是否成功:", ok)
+				logx.Info("执行事务是否成功:", ok)
 				if !ok {
 					errCount++
 					continue
 				}
-				EtcdCountAdd(v, strconv.Itoa(int(this.MsgType)))
-
+				ok1 := MsgCountAdd(v, strconv.Itoa(int(this.MsgType)))
+				log.Println("存redis:", ok1)
 			} else {
 				in := entity.Mysql.InsertBySql(sql3)
-				log.Println("插入消息返回 in1 id:", in)
+				logx.Info("插入消息返回 in1 id:", in)
 				if in > -1 {
-					EtcdCountAdd(v, strconv.Itoa(int(this.MsgType)))
+					ok := MsgCountAdd(v, strconv.Itoa(int(this.MsgType)))
+					log.Println("存redis:", ok)
 				} else {
 					errCount++
 				}