package sse import ( "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/date" "app.yhyue.com/moapp/jybase/go-logger/logger" "app.yhyue.com/moapp/jybase/go-xweb/xweb" "app.yhyue.com/moapp/message/model" "app.yhyue.com/moapp/message/util" "fmt" "net/http" "strings" "time" ) type ServerSentRouter struct { *xweb.Action events xweb.Mapper `xweb:"/sse/events"` send xweb.Mapper `xweb:"/sse/send"` notify xweb.Mapper `xweb:"/sse/notify"` } func (s *ServerSentRouter) Notify() { defer common.Catch() s.ResponseWriter.Header().Set("Content-Type", "text/event-stream") s.ResponseWriter.Header().Set("Cache-Control", "no-cache") s.ResponseWriter.Header().Set("Connection", "keep-alive") s.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*") sessVal := s.Session().GetMultiple() userId := common.ObjToString(sessVal["userId"]) if userId == "" { userId = common.InterfaceToStr(s.Session().Id()) } if userId != "" { keys := fmt.Sprintf("%s#%d", userId, time.Now().UnixMilli()) logger.Info("new userId:", keys) util.SetSseRedisCache(userId, keys) // 创建用户专属的消息通道 messageChan := make(chan string) // 注册当前连接 util.SseClientsMu.Lock() util.SseClients[keys] = messageChan util.SseClientsMu.Unlock() // 客户端断开时清理资源 defer func() { util.SseClientsMu.Lock() delete(util.SseClients, keys) close(messageChan) util.SseClientsMu.Unlock() }() // 监听消息通道并发送给客户端 flusher := s.ResponseWriter.(http.Flusher) ticker := time.NewTicker(20 * time.Second) defer ticker.Stop() for { select { case msg := <-messageChan: logger.Info("--msg--:", msg, "--userId--:", userId) fmt.Fprintf(s.ResponseWriter, "data: %s\n\n", msg) flusher.Flush() case <-ticker.C: //logger.Info("--heartbeat--:", userId) fmt.Fprintf(s.ResponseWriter, ": heartbeat\n\n") flusher.Flush() case <-s.Request.Context().Done(): return // 客户端断开连接 } } } else { s.ServeJson("not login") } } func (s *ServerSentRouter) Send() { msg := s.GetString("msg") phone := s.GetString("phone") state, _ := s.GetInteger("state") if msg == "" { msg = "Default notification at " + time.Now().Format(time.RFC3339) } else { switch state { case 1: util.SendNotificationToAll(model.SseMessage{ Name: msg, User: phone, State: model.TaskTarget, Time: time.Now().Format(date.Date_Full_Layout), Remark: fmt.Sprintf("全部用户---测试信息:恭喜 %s 抽中 %s ", phone, msg), }) default: userId := s.GetString("userId") util.SendNotification(userId, model.SseMessage{ Name: msg, User: phone, State: model.TaskTarget, Time: time.Now().Format(date.Date_Full_Layout), Remark: fmt.Sprintf("单独用户---测试信息:恭喜 %s 抽中 %s ", phone, msg), }) } } fmt.Fprintf(s.ResponseWriter, "Broadcasted: %s", msg) } func (s *ServerSentRouter) Events() { defer common.Catch() s.ResponseWriter.Header().Set("Content-Type", "text/event-stream") s.ResponseWriter.Header().Set("Cache-Control", "no-cache") s.ResponseWriter.Header().Set("Connection", "keep-alive") s.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*") sessVal := s.Session().GetMultiple() userId := common.ObjToString(sessVal["userId"]) sessionId := s.Session().Id() if sessionId != "" { clientChan := make(chan string, 10) util.SseBroadcast.Mu.Lock() util.SseBroadcast.Clients[clientChan] = struct{}{} util.SseBroadcast.Mu.Unlock() defer func() { util.SseBroadcast.Mu.Lock() delete(util.SseBroadcast.Clients, clientChan) util.SseBroadcast.Mu.Unlock() close(clientChan) logger.Info("Client disconnected") }() flusher := s.ResponseWriter.(http.Flusher) for { select { case msg := <-clientChan: logger.Info("--msg--:", msg, "--userId--:", userId) var isSend bool if strings.Contains(msg, model.WinningTarget) { isSend = true } else if userId != "" { isSend = true } if isSend { fmt.Fprintf(s.ResponseWriter, "data: %s\n\n", msg) flusher.Flush() } case <-s.Request.Context().Done(): return } } } }