sse.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package sse
  2. import (
  3. "app.yhyue.com/moapp/jybase/common"
  4. "app.yhyue.com/moapp/jybase/date"
  5. "app.yhyue.com/moapp/jybase/go-logger/logger"
  6. "app.yhyue.com/moapp/jybase/go-xweb/xweb"
  7. "app.yhyue.com/moapp/message/model"
  8. "app.yhyue.com/moapp/message/util"
  9. "fmt"
  10. "net/http"
  11. "strings"
  12. "time"
  13. )
  14. type ServerSentRouter struct {
  15. *xweb.Action
  16. events xweb.Mapper `xweb:"/sse/events"`
  17. send xweb.Mapper `xweb:"/sse/send"`
  18. }
  19. func (s *ServerSentRouter) Send() {
  20. msg := s.GetString("msg")
  21. phone := s.GetString("phone")
  22. if msg == "" {
  23. msg = "Default notification at " + time.Now().Format(time.RFC3339)
  24. } else {
  25. util.SseBroadcast.SendToUsers(model.SseMessage{
  26. Prize: msg,
  27. User: phone,
  28. State: model.AllTarget,
  29. Time: time.Now().Format(date.Date_Full_Layout),
  30. Remark: fmt.Sprintf("测试信息:恭喜 %s 抽中 %s ", phone, msg),
  31. })
  32. }
  33. fmt.Fprintf(s.ResponseWriter, "Broadcasted: %s", msg)
  34. }
  35. func (s *ServerSentRouter) Events() {
  36. defer common.Catch()
  37. s.ResponseWriter.Header().Set("Content-Type", "text/event-stream")
  38. s.ResponseWriter.Header().Set("Cache-Control", "no-cache")
  39. s.ResponseWriter.Header().Set("Connection", "keep-alive")
  40. s.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*")
  41. sessVal := s.Session().GetMultiple()
  42. userId := common.ObjToString(sessVal["userId"])
  43. sessionId := s.Session().Id()
  44. if sessionId != "" {
  45. clientChan := make(chan string, 10)
  46. util.SseBroadcast.Mu.Lock()
  47. util.SseBroadcast.Clients[clientChan] = struct{}{}
  48. util.SseBroadcast.Mu.Unlock()
  49. defer func() {
  50. util.SseBroadcast.Mu.Lock()
  51. delete(util.SseBroadcast.Clients, clientChan)
  52. util.SseBroadcast.Mu.Unlock()
  53. close(clientChan)
  54. logger.Info("Client disconnected")
  55. }()
  56. flusher := s.ResponseWriter.(http.Flusher)
  57. for {
  58. select {
  59. case msg := <-clientChan:
  60. logger.Info("--msg--:", msg, "--userId--:", userId)
  61. var isSend bool
  62. if strings.Contains(msg, model.AllTarget) {
  63. isSend = true
  64. } else if userId != "" {
  65. isSend = true
  66. }
  67. if isSend {
  68. fmt.Fprintf(s.ResponseWriter, "data: %s\n\n", msg)
  69. flusher.Flush()
  70. }
  71. case <-s.Request.Context().Done():
  72. return
  73. }
  74. }
  75. }
  76. }