123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package sse
- import (
- "app.yhyue.com/moapp/jybase/common"
- "app.yhyue.com/moapp/jybase/date"
- "app.yhyue.com/moapp/jybase/go-logger/logger"
- "app.yhyue.com/moapp/jybase/go-xweb/xweb"
- "app.yhyue.com/moapp/message/model"
- "app.yhyue.com/moapp/message/util"
- "fmt"
- "net/http"
- "strings"
- "time"
- )
- type ServerSentRouter struct {
- *xweb.Action
- events xweb.Mapper `xweb:"/sse/events"`
- send xweb.Mapper `xweb:"/sse/send"`
- notify xweb.Mapper `xweb:"/sse/notify"`
- }
- func (s *ServerSentRouter) Notify() {
- defer common.Catch()
- s.ResponseWriter.Header().Set("Content-Type", "text/event-stream")
- s.ResponseWriter.Header().Set("Cache-Control", "no-cache")
- s.ResponseWriter.Header().Set("Connection", "keep-alive")
- s.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*")
- sessVal := s.Session().GetMultiple()
- userId := common.ObjToString(sessVal["userId"])
- if userId == "" {
- userId = common.InterfaceToStr(s.Session().Id())
- }
- if userId != "" {
- keys := fmt.Sprintf("%s#%d", userId, time.Now().UnixMilli())
- logger.Info("new userId:", keys)
- util.SetSseRedisCache(userId, keys)
- // 创建用户专属的消息通道
- messageChan := make(chan string)
- // 注册当前连接
- util.SseClientsMu.Lock()
- util.SseClients[keys] = messageChan
- util.SseClientsMu.Unlock()
- // 客户端断开时清理资源
- defer func() {
- util.SseClientsMu.Lock()
- delete(util.SseClients, keys)
- close(messageChan)
- util.SseClientsMu.Unlock()
- }()
- // 监听消息通道并发送给客户端
- flusher := s.ResponseWriter.(http.Flusher)
- ticker := time.NewTicker(20 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case msg := <-messageChan:
- logger.Info("--msg--:", msg, "--userId--:", userId)
- fmt.Fprintf(s.ResponseWriter, "data: %s\n\n", msg)
- flusher.Flush()
- case <-ticker.C:
- //logger.Info("--heartbeat--:", userId)
- fmt.Fprintf(s.ResponseWriter, ": heartbeat\n\n")
- flusher.Flush()
- case <-s.Request.Context().Done():
- return // 客户端断开连接
- }
- }
- } else {
- s.ServeJson("not login")
- }
- }
- func (s *ServerSentRouter) Send() {
- msg := s.GetString("msg")
- phone := s.GetString("phone")
- state, _ := s.GetInteger("state")
- if msg == "" {
- msg = "Default notification at " + time.Now().Format(time.RFC3339)
- } else {
- switch state {
- case 1:
- util.SendNotificationToAll(model.SseMessage{
- Name: msg,
- User: phone,
- State: model.TaskTarget,
- Time: time.Now().Format(date.Date_Full_Layout),
- Remark: fmt.Sprintf("全部用户---测试信息:恭喜 %s 抽中 %s ", phone, msg),
- })
- default:
- userId := s.GetString("userId")
- util.SendNotification(userId, model.SseMessage{
- Name: msg,
- User: phone,
- State: model.TaskTarget,
- Time: time.Now().Format(date.Date_Full_Layout),
- Remark: fmt.Sprintf("单独用户---测试信息:恭喜 %s 抽中 %s ", phone, msg),
- })
- }
- }
- fmt.Fprintf(s.ResponseWriter, "Broadcasted: %s", msg)
- }
- func (s *ServerSentRouter) Events() {
- defer common.Catch()
- s.ResponseWriter.Header().Set("Content-Type", "text/event-stream")
- s.ResponseWriter.Header().Set("Cache-Control", "no-cache")
- s.ResponseWriter.Header().Set("Connection", "keep-alive")
- s.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*")
- sessVal := s.Session().GetMultiple()
- userId := common.ObjToString(sessVal["userId"])
- sessionId := s.Session().Id()
- if sessionId != "" {
- clientChan := make(chan string, 10)
- util.SseBroadcast.Mu.Lock()
- util.SseBroadcast.Clients[clientChan] = struct{}{}
- util.SseBroadcast.Mu.Unlock()
- defer func() {
- util.SseBroadcast.Mu.Lock()
- delete(util.SseBroadcast.Clients, clientChan)
- util.SseBroadcast.Mu.Unlock()
- close(clientChan)
- logger.Info("Client disconnected")
- }()
- flusher := s.ResponseWriter.(http.Flusher)
- for {
- select {
- case msg := <-clientChan:
- logger.Info("--msg--:", msg, "--userId--:", userId)
- var isSend bool
- if strings.Contains(msg, model.WinningTarget) {
- isSend = true
- } else if userId != "" {
- isSend = true
- }
- if isSend {
- fmt.Fprintf(s.ResponseWriter, "data: %s\n\n", msg)
- flusher.Flush()
- }
- case <-s.Request.Context().Done():
- return
- }
- }
- }
- }
|