sse.go 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package util
  2. import (
  3. "app.yhyue.com/moapp/jybase/go-logger/logger"
  4. "app.yhyue.com/moapp/message/model"
  5. "encoding/json"
  6. "log"
  7. "sync"
  8. )
  9. type Broadcast struct {
  10. Clients map[chan string]struct{}
  11. Mu sync.Mutex
  12. }
  13. var (
  14. SseBroadcast = &Broadcast{
  15. Clients: make(map[chan string]struct{}),
  16. }
  17. )
  18. // SendToUsers 消息通知
  19. func (s *Broadcast) SendToUsers(msg model.SseMessage) {
  20. s.Mu.Lock()
  21. defer s.Mu.Unlock()
  22. msgData, _ := json.Marshal(msg)
  23. for clientChan := range s.Clients {
  24. select {
  25. case clientChan <- string(msgData):
  26. logger.Info("send to all user, msg:", msg)
  27. default:
  28. logger.Info("Client channel full, skipping")
  29. }
  30. }
  31. }
  32. // 全局变量:存储所有用户的 SSE 连接
  33. var (
  34. SseClients = make(map[string]chan string) // 用户ID -> 消息通道
  35. SseClientsMu sync.Mutex // 保护 clients 的互斥锁
  36. )
  37. // 向指定用户发送通知
  38. func SendNotification(userId string, msg model.SseMessage) {
  39. SseClientsMu.Lock()
  40. defer SseClientsMu.Unlock()
  41. msgData, _ := json.Marshal(msg)
  42. if ch, ok := SseClients[userId]; ok {
  43. ch <- string(msgData)
  44. }
  45. }
  46. // 向所有用户发送通知
  47. func SendNotificationToAll(msg model.SseMessage) {
  48. SseClientsMu.Lock()
  49. defer SseClientsMu.Unlock()
  50. msgData, _ := json.Marshal(msg)
  51. for userId, ch := range SseClients {
  52. log.Println("userId:", userId)
  53. ch <- string(msgData)
  54. }
  55. }