package util import ( "app.yhyue.com/moapp/jybase/go-logger/logger" "app.yhyue.com/moapp/message/model" "encoding/json" "sync" ) type Broadcast struct { Clients map[chan string]struct{} Mu sync.Mutex } var ( SseBroadcast = &Broadcast{ Clients: make(map[chan string]struct{}), } ) // SendToUsers 消息通知 func (s *Broadcast) SendToUsers(msg model.SseMessage) { s.Mu.Lock() defer s.Mu.Unlock() msgData, _ := json.Marshal(msg) for clientChan := range s.Clients { select { case clientChan <- string(msgData): logger.Info("send to all user, msg:", msg) default: logger.Info("Client channel full, skipping") } } } // 全局变量:存储所有用户的 SSE 连接 var ( SseClients = make(map[string]chan string) // 用户ID -> 消息通道 SseClientsMu sync.Mutex // 保护 clients 的互斥锁 ) // 向指定用户发送通知 func SendNotification(userId string, msg model.SseMessage) { logger.Info(userId, "----SendNotification-----SseClients:", SseClients) SseClientsMu.Lock() defer SseClientsMu.Unlock() msgData, _ := json.Marshal(msg) if ch, ok := SseClients[userId]; ok { ch <- string(msgData) } } // 向所有用户发送通知 func SendNotificationToAll(msg model.SseMessage) { logger.Info("SendNotificationToAll ----- SseClients:", SseClients) SseClientsMu.Lock() defer SseClientsMu.Unlock() msgData, _ := json.Marshal(msg) for userId, ch := range SseClients { logger.Info("to all ; userId:", userId) ch <- string(msgData) } }