package util import ( "app.yhyue.com/moapp/jybase/go-logger/logger" "app.yhyue.com/moapp/jybase/redis" "app.yhyue.com/moapp/message/model" "encoding/json" "fmt" "strings" "sync" ) type Broadcast struct { Clients map[chan string]struct{} Mu sync.Mutex } var ( SseBroadcast = &Broadcast{ Clients: make(map[chan string]struct{}), } ) // SendToUsers 消息通知 func (s *Broadcast) SendToUsers(msg model.SseMessage) { s.Mu.Lock() defer s.Mu.Unlock() msgData, _ := json.Marshal(msg) for clientChan := range s.Clients { select { case clientChan <- string(msgData): logger.Info("send to all user, msg:", msg) default: logger.Info("Client channel full, skipping") } } } // 全局变量:存储所有用户的 SSE 连接 var ( SseClients = make(map[string]chan string) // 用户ID -> 消息通道 SseClientsMu sync.Mutex // 保护 clients 的互斥锁 ) // 向指定用户发送通知 func SendNotification(userId string, msg model.SseMessage) { //logger.Info(userId, "----SendNotification-----SseClients:", SseClients) SseClientsMu.Lock() defer SseClientsMu.Unlock() msgData, _ := json.Marshal(msg) if ch, ok := SseClients[userId]; ok { ch <- string(msgData) } } // 向所有用户发送通知 func SendNotificationToAll(msg model.SseMessage) { //logger.Info("SendNotificationToAll ----- SseClients:", SseClients) SseClientsMu.Lock() defer SseClientsMu.Unlock() msgData, _ := json.Marshal(msg) for userId, ch := range SseClients { logger.Info("to all ; userId:", userId) ch <- string(msgData) } } const ( sseRedisKey = "sse_cache_%s" powerCacheDb = "newother" expiresTime = 24 * 60 * 60 ) func SetSseRedisCache(userId, keys string) { cacheKey := fmt.Sprintf(sseRedisKey, userId) cacheData := redis.GetStr(powerCacheDb, cacheKey) var caches []string if cacheData == "" { caches = append(caches, userId, keys) } else { caches = append(strings.Split(cacheData, ","), keys) } redis.Put(powerCacheDb, cacheKey, strings.Join(caches, ","), expiresTime) } func GetSseRedisCache(userId string) (keys []string) { cacheKey := fmt.Sprintf(sseRedisKey, userId) cacheData := redis.GetStr(powerCacheDb, cacheKey) return strings.Split(cacheData, ",") }