package sse import ( "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/date" "app.yhyue.com/moapp/jybase/go-logger/logger" "app.yhyue.com/moapp/jybase/go-xweb/xweb" "app.yhyue.com/moapp/message/model" "app.yhyue.com/moapp/message/util" "fmt" "net/http" "strings" "time" ) type ServerSentRouter struct { *xweb.Action events xweb.Mapper `xweb:"/sse/events"` send xweb.Mapper `xweb:"/sse/send"` } func (s *ServerSentRouter) Send() { msg := s.GetString("msg") phone := s.GetString("phone") if msg == "" { msg = "Default notification at " + time.Now().Format(time.RFC3339) } else { util.SseBroadcast.SendToUsers(model.SseMessage{ Prize: msg, User: phone, State: model.AllTarget, Time: time.Now().Format(date.Date_Full_Layout), Remark: fmt.Sprintf("测试信息:恭喜 %s 抽中 %s ", phone, msg), }) } fmt.Fprintf(s.ResponseWriter, "Broadcasted: %s", msg) } func (s *ServerSentRouter) Events() { defer common.Catch() s.ResponseWriter.Header().Set("Content-Type", "text/event-stream") s.ResponseWriter.Header().Set("Cache-Control", "no-cache") s.ResponseWriter.Header().Set("Connection", "keep-alive") s.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*") sessVal := s.Session().GetMultiple() userId := common.ObjToString(sessVal["userId"]) sessionId := s.Session().Id() if sessionId != "" { clientChan := make(chan string, 10) util.SseBroadcast.Mu.Lock() util.SseBroadcast.Clients[clientChan] = struct{}{} util.SseBroadcast.Mu.Unlock() defer func() { util.SseBroadcast.Mu.Lock() delete(util.SseBroadcast.Clients, clientChan) util.SseBroadcast.Mu.Unlock() close(clientChan) logger.Info("Client disconnected") }() flusher := s.ResponseWriter.(http.Flusher) for { select { case msg := <-clientChan: logger.Info("--msg--:", msg, "--userId--:", userId) var isSend bool if strings.Contains(msg, model.AllTarget) { isSend = true } else if userId != "" { isSend = true } if isSend { fmt.Fprintf(s.ResponseWriter, "data: %s\n\n", msg) flusher.Flush() } case <-s.Request.Context().Done(): return } } } }