瀏覽代碼

代码合并

renjiaojiao 1 年之前
父節點
當前提交
8489e6ef23

+ 4 - 0
rpc/etc/message.yaml

@@ -92,3 +92,7 @@ PayPushNumber: 5
 EquityInfoMsgType: 13 #营销权益消息需要特殊处理
 NewUserMsgTitle: 做任务赚好礼
 IsFilterActive: true
+RedisFailureTime: 120
+ThreadCount: 15
+MsgLogLimit: 3000
+

+ 3 - 0
rpc/internal/common/getBuoyMsg.go

@@ -87,6 +87,9 @@ func ClearUnreadMsg(in *message.ClearUnreadMsgReq) error {
 				redis.Put(redisModule, classKeyString, 0, -1)
 			}
 		}
+		redis.Del(redisModule, fmt.Sprintf(UserWorkDeskKey, in.Userid))
+		redis.Del(redisModule, fmt.Sprintf(UserMsgSummery, in.Userid))
+		redis.Del(redisModule, fmt.Sprintf(UserClassMapKey, in.Userid))
 	}
 	//更新私信未读数
 	if in.PositionId > 0 {

+ 5 - 0
rpc/internal/common/messageService.go

@@ -39,6 +39,11 @@ func (service *MessageService) ChangeReadStatus(data *message.ChangeReadStatusRe
 		if redis.GetInt(redisModule, keyString) > 0 {
 			redis.Decrby(redisModule, keyString, 1)
 		}
+		if groupId == 5 || groupId == 11 {
+			redis.Del(redisModule, fmt.Sprintf(UserWorkDeskKey, data.UserId))
+		}
+		redis.Del(redisModule, fmt.Sprintf(UserMsgSummery, data.UserId))
+		redis.Del(redisModule, fmt.Sprintf(UserClassMapKey, data.UserId))
 	} else {
 		return errors.New(fmt.Sprintf("消息不存在:%d", data.Id))
 	}

+ 96 - 35
rpc/internal/common/msglistService.go

@@ -9,7 +9,7 @@ import (
 	m "app.yhyue.com/moapp/jybase/mongodb"
 	"app.yhyue.com/moapp/jybase/redis"
 	"context"
-	"errors"
+	"encoding/json"
 	"fmt"
 	"log"
 	"strconv"
@@ -27,9 +27,13 @@ func BitmapUserMsgList(this *message.UserMsgListReq) (*message.UserMsgList, *mes
 		fmt.Printf("此用户暂无消息 : %s", err.Error())
 	}
 	//用户分类消息
-	_, userClassMsgMap := FindUserClassMsg(this.UserId)
-	//用户分类未读消息
-	classUnreadCountMap, classUnreadMsgMap := FindUserClassUnread(this.UserId)
+	//_, userClassMsgMap := FindUserClassMsg(this.UserId)
+	////用户分类未读消息
+	//classUnreadCountMap, classUnreadMsgMap := FindUserClassUnread(this.UserId)
+	classUnreadCountMap, classUnreadMsgMap, userClassMsgMap, err := FindUserClassUnreadAndClassAllMsg(this.UserId)
+	if err != nil {
+		return nil, nil
+	}
 	//fmt.Println("用户所有消息数:", userAllMsgArr, "已读消息数:", userReadArr, "未读消息数:", userUnreadArr)
 	//导航未读消息总数
 	if !this.IsMsgList && !this.IsColumnNewMsg && !this.IsColumn { //消息未读数统计
@@ -297,9 +301,10 @@ func BitmapMessageGetLast(this *message.UserMsgListReq, msgId int) *message.Mess
 	return nil
 }
 
-type UserClassUnread struct {
-	GroupId   int32    `ch:"group_id"`
-	UnreadArr []uint32 `ch:"unreadArr"`
+type UserClassSummary struct {
+	GroupId     int32    `ch:"group_id"`
+	ClassUnread []uint32 `ch:"unreadArr"`
+	ClassAll    []uint32 `ch:"classMsgArr"`
 }
 
 // BitmapCountUnread 未读消息合计 isRedis 是否需要初始化redis
@@ -378,35 +383,53 @@ func BitmapCountClassUnread(userId string, groupId int64, classUnreadCountMap ma
 	return data, count
 }
 
-//FindUserClassUnread 查询用户分类未读消息id
-func FindUserClassUnread(userId string) (map[int]int, map[int][]int) {
-	sql := fmt.Sprintf(`select a2.group_id,bitmapToArray(bitmapAnd(a1.unreadArr,a2.msg_bitmap)) as unreadArr from 
-							(SELECT bitmapAndnot(mus.allMsg,mus.readMsg) unreadArr FROM message_user_summary mus WHERE mus.userId = '%s') a1,
-						(select msg_bitmap,group_id from message_summary ms  where 1=1 limit 10) a2`, userId)
-	fmt.Println("FindUserClassUnread", sql)
+//FindUserClassUnreadAndClassAllMsg 查询用户分类未读消息id
+func FindUserClassUnreadAndClassAllMsg(userId string) (classUnreadCountMap map[int]int, classUnreadMsgMap, classAllMsgMap map[int][]int, err error) {
+	redisKey := fmt.Sprintf(UserClassMapKey, userId)
+	redisData := redis.Get(redisModule, redisKey)
+	findRes := []UserClassSummary{}
+	if redisData != nil {
+		if data, err1 := json.Marshal(redisData); err1 == nil {
+			json.Unmarshal(data, &findRes)
+			for _, v := range findRes {
+				classUnreadCountMap[int(v.GroupId)] = len(v.ClassUnread)
+				classUnreadMsgMap[int(v.GroupId)] = Uint32ArrToIntArr(v.ClassUnread)
+				//classAllCountMap[int(v.GroupId)] = len(v.ClassAll)
+				classAllMsgMap[int(v.GroupId)] = Uint32ArrToIntArr(v.ClassAll)
+			}
+		}
+	}
+	sql := fmt.Sprintf(`select a2.group_id,bitmapToArray(bitmapAnd(a1.unreadArr,a2.msg_bitmap)) as unreadArr,bitmapToArray(bitmapAnd(a1.allMsg,a2.msg_bitmap)) as classMsgArr from 
+			(SELECT bitmapAndnot(mus.allMsg,mus.readMsg) unreadArr,mus.allMsg FROM message_user_summary mus WHERE mus.userId = '%s') a1,
+			(select msg_bitmap,group_id from message_summary ms  where 1=1 limit 10) a2`, userId)
+	log.Println("FindUserClassUnreadAndClassAllMsg", sql)
 	rows, err := entity.ClickhouseConn.Query(context.Background(), sql)
 	if err != nil {
 		log.Println("获取各分类未读消息数组出错:", err)
-		return nil, nil
+		return
 	}
-	classUnreadCountMap := map[int]int{}
-	classUnreadMsgMap := map[int][]int{}
 	for rows.Next() {
-		group := UserClassUnread{}
+		group := UserClassSummary{}
 		err = rows.ScanStruct(&group)
 		if err != nil {
 			log.Println("获取各分类读取分类数据出错:", err)
-			return nil, nil
+			return
 		}
-		classUnreadCountMap[int(group.GroupId)] = len(group.UnreadArr)
-		classUnreadMsgMap[int(group.GroupId)] = Uint32ArrToIntArr(group.UnreadArr)
+		findRes = append(findRes, group)
+		classUnreadCountMap[int(group.GroupId)] = len(group.ClassUnread)
+		classUnreadMsgMap[int(group.GroupId)] = Uint32ArrToIntArr(group.ClassUnread)
+		//classAllCountMap[int(group.GroupId)] = len(group.ClassAll)
+		classAllMsgMap[int(group.GroupId)] = Uint32ArrToIntArr(group.ClassAll)
 	}
-	return classUnreadCountMap, classUnreadMsgMap
+	if findRes != nil && len(findRes) > 0 { //缓存
+		redis.Put(redisModule, redisKey, findRes, config.ConfigJson.RedisFailureTime)
+	}
+	return
 }
 
 //FindUserClassMsg 查询用户分类所有消息id
-func FindUserClassMsg(userId string) (map[int]int, map[int][]int) {
-	sql := fmt.Sprintf(`select a2.group_id,bitmapToArray(bitmapAnd(a1.allMsg,a2.msg_bitmap)) as unreadArr from 
+/*func FindUserClassMsg(userId string) (map[int]int, map[int][]int) {
+	sql := fmt.Sprintf(`select a2.group_id,bitmapToArray(bitmapAnd(a1.allMsg,a2.msg_bitmap)) as unreadArr from
 				(SELECT mus.allMsg as allMsg FROM message_user_summary mus WHERE mus.userId = '%s') a1,
 			(select msg_bitmap,group_id from message_summary ms  where 1=1 limit 10) a2`, userId)
 	fmt.Println("FindUserClassMsg", sql)
@@ -418,7 +441,7 @@ func FindUserClassMsg(userId string) (map[int]int, map[int][]int) {
 	classCountMap := map[int]int{}
 	classMsgMap := map[int][]int{}
 	for rows.Next() {
-		group := UserClassUnread{}
+		group := UserClassSummary{}
 		err = rows.ScanStruct(&group)
 		if err != nil {
 			log.Println("获取各分类读取分类数据出错:", err)
@@ -429,19 +452,21 @@ func FindUserClassMsg(userId string) (map[int]int, map[int][]int) {
 		classMsgMap[int(group.GroupId)] = Uint32ArrToIntArr(group.UnreadArr)
 	}
 	return classCountMap, classMsgMap
-}
+}*/
 
 // GetUserMsgSummary 从用户消息汇总表取数据
 func GetUserMsgSummary(userId string, isfilterActive bool) (userAllMsg, userReadMsg, userUnreadMsg []uint32, err error) {
-	var count uint64
-	sqlc := ""
-	sqlc = fmt.Sprintf("SELECT COUNT(*) as count from message_user_summary WHERE userId = '%s'", userId)
-	log.Println("GetUserMsgSummary selcet ", sqlc)
-	row1 := entity.ClickhouseConn.QueryRow(context.Background(), sqlc)
-	err = row1.Scan(&count)
-	if count == 0 {
-		err = errors.New("用户暂无数据")
-		return
+	key := fmt.Sprintf(UserMsgSummery, userId)
+	redisData := redis.Get(redisModule, key)
+	if redisData != nil {
+		if d, err1 := json.Marshal(redisData); err1 == nil {
+			var userData map[string][]uint32
+			json.Unmarshal(d, &userData)
+			userAllMsg = userData["userAllMsg"]
+			userReadMsg = userData["userReadMsg"]
+			userUnreadMsg = userData["userUnreadMsg"]
+			return
+		}
 	}
 	var sql string
 	isfilterActive = config.ConfigJson.IsFilterActive
@@ -450,13 +475,23 @@ func GetUserMsgSummary(userId string, isfilterActive bool) (userAllMsg, userRead
 	} else {
 		sql = fmt.Sprintf("SELECT bitmapToArray(allMsg) as userAllMsg,bitmapToArray(readMsg) as userReadMsg,bitmapToArray(bitmapAndnot(allMsg,readMsg)) as userunRead from message_user_summary where userId ='%s'", userId)
 	}
-	//sql := fmt.Sprintf("SELECT bitmapToArray(allMsg) as userAllMsg,bitmapToArray(readMsg) as userReadMsg,bitmapToArray(bitmapAndnot(allMsg,readMsg)) as userunRead from message_user_summary where userId ='%s'", userId)
+	// 在这里可以使用 lock 对共享资源进行保护
+	lock := UserLocksMap.GetLock(userId)
+	lock.Lock()
+	defer lock.Unlock()
 	log.Println("GetUserMsgSummary selcet2 ", sql)
 	row := entity.ClickhouseConn.QueryRow(context.Background(), sql)
 	err = row.Scan(&userAllMsg, &userReadMsg, &userUnreadMsg)
 	if err != nil {
 		log.Println("此用户暂无数据:", err)
+		return
 	}
+	//缓存
+	redis.Put(redisModule, key, map[string][]uint32{
+		"userAllMsg":    userAllMsg,
+		"userReadMsg":   userReadMsg,
+		"userUnreadMsg": userUnreadMsg,
+	}, config.ConfigJson.RedisFailureTime)
 	return
 }
 
@@ -499,6 +534,18 @@ type UserMsg struct {
 }
 
 func WorkDeskList(in *message.WorkingDesktopReq) (res1, res2 []*message.Messages, err error) {
+	redisKey := fmt.Sprintf(UserWorkDeskKey, in.UserId)
+	redisData := redis.Get(redisModule, redisKey)
+	if redisData != nil {
+		if d, err1 := json.Marshal(redisData); err1 == nil {
+			var userData map[string][]*message.Messages
+			json.Unmarshal(d, &userData)
+			res1 = userData["res1"]
+			res2 = userData["res2"]
+			//fmt.Println("=====", userData)
+			return
+		}
+	}
 	//待办查询
 	sqlStr := ""
 	needDo := map[int][]uint32{} //msgType对应的消息id
@@ -510,6 +557,9 @@ func WorkDeskList(in *message.WorkingDesktopReq) (res1, res2 []*message.Messages
 			(select msg_bitmap,group_id from message_summary ms  where group_id = %d %s) a2`, in.UserId, in.MsgType, sqlStr)
 
 	log.Println("WorkDeskList", sql)
+	lock := UserLocksMap.GetLock(in.UserId)
+	lock.Lock()
+	defer lock.Unlock()
 	rows, err := entity.ClickhouseConn.Query(context.Background(), sql)
 	if err != nil {
 		return nil, nil, err
@@ -540,6 +590,17 @@ func WorkDeskList(in *message.WorkingDesktopReq) (res1, res2 []*message.Messages
 		needMsgArr := IntArrSort(Uint32ArrToIntArr(totalMsg))
 		res2 = GetMsgList(needMsgArr, int(in.PageSize), in.UserId, readMsgMap)
 	}
+	//缓存
+	res := map[string][]*message.Messages{}
+	if res1 != nil {
+		res["res1"] = res1
+	}
+	if res2 != nil {
+		res["res2"] = res2
+	}
+	if res != nil {
+		redis.Put(redisModule, redisKey, res, config.ConfigJson.RedisFailureTime)
+	}
 	return res1, res2, nil
 }
 

+ 20 - 4
rpc/internal/common/newSendMsgService.go

@@ -87,12 +87,13 @@ func NewUserSendMsg(in *message.NewUserInsertMsgReq) error {
 			Row4:        in.Row4,
 		}
 		SentWxTmplAndAppPush(pushData, userIdArr[i], group_id, "", "")
-		key := fmt.Sprintf(MsgCountKey, userIdArr[i], group_id)
+		/*key := fmt.Sprintf(MsgCountKey, userIdArr[i], group_id)
 		redis.Del(redisModule, key)
 		if in.MsgType == 11 || in.MsgType == 12 {
 			key1 := fmt.Sprintf(MsgClassCountKey, userIdArr[i], in.MsgType)
 			redis.Del(redisModule, key1)
-		}
+		}*/
+		DelRedis(userIdArr[i], in.MsgType, group_id)
 	}
 	//wg.Wait()
 	return nil
@@ -159,18 +160,33 @@ func UpdateUserMsgSummary(in *message.MultipleSaveMsgReq) error {
 				Row4:        in.Row4,
 			}
 			SentWxTmplAndAppPush(pushData, v, group_id, equityName, equityAddr)
-			key := fmt.Sprintf(MsgCountKey, v, group_id)
+			/*key := fmt.Sprintf(MsgCountKey, v, group_id)
 			redis.Del(redisModule, key)
 			if in.MsgType == 11 || in.MsgType == 12 {
 				key1 := fmt.Sprintf(MsgClassCountKey, v, in.MsgType)
 				redis.Del(redisModule, key1)
-			}
+			}*/
+			DelRedis(v, in.MsgType, group_id)
 		}(userIdArr[i])
 	}
 	wg.Wait()
 	return nil
 }
 
+func DelRedis(userId string, msgType int64, groupId int) {
+	key := fmt.Sprintf(MsgCountKey, userId, groupId)
+	redis.Del(redisModule, key)
+	if msgType == 11 || msgType == 12 {
+		key1 := fmt.Sprintf(MsgClassCountKey, userId, msgType)
+		redis.Del(redisModule, key1)
+	}
+	if groupId == 5 || groupId == 11 {
+		redis.Del(redisModule, fmt.Sprintf(UserWorkDeskKey, userId))
+	}
+	redis.Del(redisModule, fmt.Sprintf(UserMsgSummery, userId))
+	redis.Del(redisModule, fmt.Sprintf(UserClassMapKey, userId))
+}
+
 func Update(str string, msgLogId int64) {
 	log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str))
 	err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([toUInt64(%d)])) where userId in (%s)`, msgLogId, str))

+ 3 - 0
rpc/internal/common/sendMsg.go

@@ -18,6 +18,9 @@ const order = "1,4"
 const MsgCountKey = "count_%s_%d"                //redis 消息未读数量 Count.用户id.消息类型=数量
 const MsgClassCountKey = "msg_class_count_%s_%d" //redis 用户消息class分类消息数量
 const redisModule = "msgCount"
+const UserMsgSummery = "user_msg_summery_%s"
+const UserWorkDeskKey = "user_work_desk_%s"
+const UserClassMapKey = "user_class_msg_%s"
 
 func FindUserMsg(this message.FindUserMsgReq, isClean bool) message.FindUserMsgRes {
 	var err error

+ 1 - 1
rpc/internal/common/task.go

@@ -31,7 +31,7 @@ func LoadTask() {
 func LoadMsgOnTime() {
 	fmt.Println("开始执行")
 	msgMap := make(map[int]map[string]interface{})
-	m := entity.Mysql.SelectBySql("SELECT id,msg_type,title,content,send_time,menu_name,link,group_id,sign FROM message_send_log WHERE send_status = 4 AND isdel = 1 ORDER BY send_time DESC limit 2000")
+	m := entity.Mysql.SelectBySql("SELECT id,msg_type,title,content,send_time,menu_name,link,group_id,sign FROM message_send_log WHERE send_status = 4 AND isdel = 1 ORDER BY send_time DESC limit ?", config.ConfigJson.MsgLogLimit)
 	if m != nil && len(*m) > 0 {
 		for _, val := range *m {
 			msgMap[common.IntAll(val["id"])] = val

+ 42 - 0
rpc/internal/common/userSyncMap.go

@@ -0,0 +1,42 @@
+package common
+
+import (
+	"app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
+	"fmt"
+	"hash/fnv"
+	"sync"
+)
+
+var UserLocksMap *UserLocks
+
+type UserLocks struct {
+	locks []*sync.Mutex
+	mutex sync.Mutex
+}
+
+func NewUserLocks() *UserLocks {
+	locks := make([]*sync.Mutex, config.ConfigJson.ThreadCount)
+	for i := range locks {
+		locks[i] = &sync.Mutex{}
+	}
+	return &UserLocks{
+		locks: locks,
+	}
+}
+
+func (ul *UserLocks) GetLock(userID string) *sync.Mutex {
+	hash := hashUserID(userID)
+
+	ul.mutex.Lock()
+	defer ul.mutex.Unlock()
+
+	index := hash % uint32(len(ul.locks))
+	fmt.Println("index", index)
+	return ul.locks[index]
+}
+
+func hashUserID(userID string) uint32 {
+	h := fnv.New32a()
+	h.Write([]byte(userID))
+	return h.Sum32()
+}

+ 3 - 0
rpc/internal/config/config.go

@@ -36,6 +36,9 @@ type Config struct {
 	EquityInfoMsgType int64  `json:"EquityInfoMsgType"` // 营销权益消息需要特殊处理的消息类型
 	NewUserMsgTitle   string `json:"NewUserMsgTitle"`
 	IsFilterActive    bool   `json:"IsFilterActive"`
+	RedisFailureTime  int    `json:"RedisFailureTime"` //用户消息缓存失效时间
+	ThreadCount       int    `json:"ThreadCount"`      //线程锁,配置15个,用户hash取锁
+	MsgLogLimit       int    `json:"MsgLogLimit"`      //加载消息记录条数
 }
 
 type CHouseConfig struct {

+ 1 - 0
rpc/message.go

@@ -148,6 +148,7 @@ func init() {
 	//entity.Engine, _ = xorm.NewEngine("mysql", config.ConfigJson.TidbEng)
 	//entity.Engine.ShowSQL(true)
 	common.LoadTask()
+	common.UserLocksMap = common.NewUserLocks()
 }
 
 // 创建clickhouse连接