wangshan 2 meses atrás
pai
commit
13c222f47e
3 arquivos alterados com 45 adições e 15 exclusões
  1. 13 9
      handler/activity/lotteryDrawTask.go
  2. 5 6
      services/sse/sse.go
  3. 27 0
      util/sse.go

+ 13 - 9
handler/activity/lotteryDrawTask.go

@@ -216,15 +216,19 @@ func LotteryDrawTask(msg *model.Message) {
 				userName = string(phone[0:3]) + "****" + string(phone[(len(phone)-4):])
 			}
 		}
-		//util.SendNotification(msgBody.UserId, model.SseMessage{
-		util.SendNotificationToAll(model.SseMessage{
-			Name:     taskInfo.Name,
-			User:     encrypt.SE.EncodeString(msgBody.UserId),
-			State:    model.TaskTarget,
-			Time:     time.Now().Format(date.Date_Full_Layout),
-			Remark:   fmt.Sprintf("%s 完成 %s 任务。", userName, taskInfo.Name),
-			ActiveId: encrypt.SE.EncodeString(strconv.FormatInt(msgBody.ActiveId, 10)),
-		})
+		if sendData := util.GetSseRedisCache(msgBody.UserId); len(sendData) > 0 {
+			for _, sv := range sendData {
+				util.SendNotification(sv, model.SseMessage{
+					//util.SendNotificationToAll(model.SseMessage{
+					Name:     taskInfo.Name,
+					User:     encrypt.SE.EncodeString(msgBody.UserId),
+					State:    model.TaskTarget,
+					Time:     time.Now().Format(date.Date_Full_Layout),
+					Remark:   fmt.Sprintf("%s 完成 %s 任务。", userName, taskInfo.Name),
+					ActiveId: encrypt.SE.EncodeString(strconv.FormatInt(msgBody.ActiveId, 10)),
+				})
+			}
+		}
 	}
 }
 

+ 5 - 6
services/sse/sse.go

@@ -29,24 +29,23 @@ func (s *ServerSentRouter) Notify() {
 
 	sessVal := s.Session().GetMultiple()
 	userId := common.ObjToString(sessVal["userId"])
-	logger.Info("message  userId:", userId)
 	if userId == "" {
 		userId = common.InterfaceToStr(s.Session().Id())
-		logger.Info("params  userId:", userId)
 	}
 	if userId != "" {
-		userId = fmt.Sprintf("%s#%d", userId, time.Now().UnixMilli())
-		logger.Info("new  userId:", userId)
+		keys := fmt.Sprintf("%s#%d", userId, time.Now().UnixMilli())
+		logger.Info("new  userId:", keys)
+		util.SetSseRedisCache(userId, keys)
 		// 创建用户专属的消息通道
 		messageChan := make(chan string)
 		// 注册当前连接
 		util.SseClientsMu.Lock()
-		util.SseClients[userId] = messageChan
+		util.SseClients[keys] = messageChan
 		util.SseClientsMu.Unlock()
 		// 客户端断开时清理资源
 		defer func() {
 			util.SseClientsMu.Lock()
-			delete(util.SseClients, userId)
+			delete(util.SseClients, keys)
 			close(messageChan)
 			util.SseClientsMu.Unlock()
 		}()

+ 27 - 0
util/sse.go

@@ -2,8 +2,11 @@ package util
 
 import (
 	"app.yhyue.com/moapp/jybase/go-logger/logger"
+	"app.yhyue.com/moapp/jybase/redis"
 	"app.yhyue.com/moapp/message/model"
 	"encoding/json"
+	"fmt"
+	"strings"
 	"sync"
 )
 
@@ -61,3 +64,27 @@ func SendNotificationToAll(msg model.SseMessage) {
 		ch <- string(msgData)
 	}
 }
+
+const (
+	sseRedisKey  = "sse_cache_%s"
+	powerCacheDb = "newother"
+	expiresTime  = 24 * 60 * 60
+)
+
+func SetSseRedisCache(userId, keys string) {
+	cacheKey := fmt.Sprintf(sseRedisKey, userId)
+	cacheData := redis.GetStr(powerCacheDb, cacheKey)
+	var caches []string
+	if cacheData == "" {
+		caches = append(caches, userId, keys)
+	} else {
+		caches = append(strings.Split(cacheData, ","), keys)
+	}
+	redis.Put(powerCacheDb, cacheKey, strings.Join(caches, ","), expiresTime)
+}
+
+func GetSseRedisCache(userId string) (keys []string) {
+	cacheKey := fmt.Sprintf(sseRedisKey, userId)
+	cacheData := redis.GetStr(powerCacheDb, cacheKey)
+	return strings.Split(cacheData, ",")
+}