sse.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. package util
  2. import (
  3. "app.yhyue.com/moapp/jybase/go-logger/logger"
  4. "app.yhyue.com/moapp/message/model"
  5. "encoding/json"
  6. "log"
  7. "sync"
  8. )
  9. type Broadcast struct {
  10. Clients map[chan string]struct{}
  11. Mu sync.Mutex
  12. }
  13. var (
  14. SseBroadcast = &Broadcast{
  15. Clients: make(map[chan string]struct{}),
  16. }
  17. )
  18. // SendToUsers 消息通知
  19. func (s *Broadcast) SendToUsers(msg model.SseMessage) {
  20. s.Mu.Lock()
  21. defer s.Mu.Unlock()
  22. msgData, _ := json.Marshal(msg)
  23. for clientChan := range s.Clients {
  24. select {
  25. case clientChan <- string(msgData):
  26. logger.Info("send to all user, msg:", msg)
  27. default:
  28. logger.Info("Client channel full, skipping")
  29. }
  30. }
  31. }
  32. // 全局变量:存储所有用户的 SSE 连接
  33. var (
  34. SseClients = make(map[string]chan string) // 用户ID -> 消息通道
  35. SseClientsMu sync.Mutex // 保护 clients 的互斥锁
  36. )
  37. // 向指定用户发送通知
  38. func SendNotification(userId string, msg model.SseMessage) {
  39. logger.Info("SendNotification-----SseClients:", SseClients)
  40. SseClientsMu.Lock()
  41. defer SseClientsMu.Unlock()
  42. msgData, _ := json.Marshal(msg)
  43. if ch, ok := SseClients[userId]; ok {
  44. ch <- string(msgData)
  45. }
  46. }
  47. // 向所有用户发送通知
  48. func SendNotificationToAll(msg model.SseMessage) {
  49. logger.Info("SendNotificationToAll ----- SseClients:", SseClients)
  50. SseClientsMu.Lock()
  51. defer SseClientsMu.Unlock()
  52. msgData, _ := json.Marshal(msg)
  53. for userId, ch := range SseClients {
  54. log.Println("userId:", userId)
  55. ch <- string(msgData)
  56. }
  57. }