sse.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. package util
  2. import (
  3. "app.yhyue.com/moapp/jybase/go-logger/logger"
  4. "app.yhyue.com/moapp/message/model"
  5. "encoding/json"
  6. "sync"
  7. )
  8. type Broadcast struct {
  9. Clients map[chan string]struct{}
  10. Mu sync.Mutex
  11. }
  12. var (
  13. SseBroadcast = &Broadcast{
  14. Clients: make(map[chan string]struct{}),
  15. }
  16. )
  17. // SendToUsers 消息通知
  18. func (s *Broadcast) SendToUsers(msg model.SseMessage) {
  19. s.Mu.Lock()
  20. defer s.Mu.Unlock()
  21. msgData, _ := json.Marshal(msg)
  22. for clientChan := range s.Clients {
  23. select {
  24. case clientChan <- string(msgData):
  25. logger.Info("send to all user, msg:", msg)
  26. default:
  27. logger.Info("Client channel full, skipping")
  28. }
  29. }
  30. }
  31. // 全局变量:存储所有用户的 SSE 连接
  32. var (
  33. SseClients = make(map[string]chan string) // 用户ID -> 消息通道
  34. SseClientsMu sync.Mutex // 保护 clients 的互斥锁
  35. )
  36. // 向指定用户发送通知
  37. func SendNotification(userId string, msg model.SseMessage) {
  38. logger.Info(userId, "----SendNotification-----SseClients:", SseClients)
  39. SseClientsMu.Lock()
  40. defer SseClientsMu.Unlock()
  41. msgData, _ := json.Marshal(msg)
  42. if ch, ok := SseClients[userId]; ok {
  43. ch <- string(msgData)
  44. }
  45. }
  46. // 向所有用户发送通知
  47. func SendNotificationToAll(msg model.SseMessage) {
  48. logger.Info("SendNotificationToAll ----- SseClients:", SseClients)
  49. SseClientsMu.Lock()
  50. defer SseClientsMu.Unlock()
  51. msgData, _ := json.Marshal(msg)
  52. for userId, ch := range SseClients {
  53. logger.Info("to all ; userId:", userId)
  54. ch <- string(msgData)
  55. }
  56. }