wangshan 3 månader sedan
förälder
incheckning
ed28166e18
4 ändrade filer med 72 tillägg och 7 borttagningar
  1. 3 3
      handler/activity/lotteryDrawTask.go
  2. 2 4
      services/filter/sessionfilter.go
  3. 39 0
      services/sse/sse.go
  4. 28 0
      util/sse.go

+ 3 - 3
handler/activity/lotteryDrawTask.go

@@ -196,7 +196,7 @@ func LotteryDrawTask(msg *model.Message) {
 		}
 	}
 	if tf {
-		logger.Info("  sse  中奖信息通知:", msg)
+		logger.Info("  sse  任务信息通知:", msg)
 		var userName = msgBody.NickName
 		if msgBody.Phone != "" {
 			var PhoneReg = regexp.MustCompile(`^(100\d{8}|1[3-9]\d{9})$`)
@@ -205,7 +205,7 @@ func LotteryDrawTask(msg *model.Message) {
 				userName = string(phone[0:3]) + "****" + string(phone[(len(phone)-4):])
 			}
 		}
-		util.SseBroadcast.SendToUsers(model.SseMessage{
+		util.SendNotification(msgBody.UserId, model.SseMessage{
 			Name:     taskInfo.Name,
 			User:     encrypt.SE.EncodeString(msgBody.UserId),
 			State:    model.TaskTarget,
@@ -250,7 +250,7 @@ func LotteryWinning(msg *model.Message) {
 		}
 	}
 	logger.Info("  sse  中奖信息通知:", msg)
-	util.SseBroadcast.SendToUsers(model.SseMessage{
+	util.SendNotificationToAll(model.SseMessage{
 		Name:     lb.PrizeName,
 		User:     userName,
 		State:    model.WinningTarget,

+ 2 - 4
services/filter/sessionfilter.go

@@ -1,10 +1,8 @@
 package filter
 
 import (
-	"net/http"
-	"strings"
-
 	"app.yhyue.com/moapp/jybase/go-xweb/xweb"
+	"net/http"
 )
 
 // 登录限制
@@ -15,7 +13,7 @@ type sessionfilter struct {
 // 继承过滤器方法
 func (l *sessionfilter) Do(w http.ResponseWriter, req *http.Request) bool {
 	session := l.App.SessionManager.Session(req, w)
-	if req.RequestURI == "/jyActivity/getConfig/info" || strings.Contains(req.RequestURI, "/sse/") {
+	if req.RequestURI == "/jyActivity/getConfig/info" {
 		return true
 	}
 	if session.Get("userId") == nil {

+ 39 - 0
services/sse/sse.go

@@ -17,6 +17,45 @@ type ServerSentRouter struct {
 	*xweb.Action
 	events xweb.Mapper `xweb:"/sse/events"`
 	send   xweb.Mapper `xweb:"/sse/send"`
+	notify xweb.Mapper `xweb:"/sse/notify"`
+}
+
+func (s *ServerSentRouter) Notify() {
+	defer common.Catch()
+	s.ResponseWriter.Header().Set("Content-Type", "text/event-stream")
+	s.ResponseWriter.Header().Set("Cache-Control", "no-cache")
+	s.ResponseWriter.Header().Set("Connection", "keep-alive")
+	s.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*")
+
+	sessVal := s.Session().GetMultiple()
+	userId := common.ObjToString(sessVal["userId"])
+	if userId != "" {
+		// 创建用户专属的消息通道
+		messageChan := make(chan string)
+		// 注册当前连接
+		util.SseClientsMu.Lock()
+		util.SseClients[userId] = messageChan
+		util.SseClientsMu.Unlock()
+
+		// 客户端断开时清理资源
+		defer func() {
+			util.SseClientsMu.Lock()
+			delete(util.SseClients, userId)
+			close(messageChan)
+			util.SseClientsMu.Unlock()
+		}()
+		// 监听消息通道并发送给客户端
+		flusher := s.ResponseWriter.(http.Flusher)
+		for {
+			select {
+			case msg := <-messageChan:
+				fmt.Fprintf(s.ResponseWriter, "data: %s\n\n", msg)
+				flusher.Flush()
+			case <-s.Request.Context().Done():
+				return // 客户端断开连接
+			}
+		}
+	}
 }
 
 func (s *ServerSentRouter) Send() {

+ 28 - 0
util/sse.go

@@ -4,6 +4,7 @@ import (
 	"app.yhyue.com/moapp/jybase/go-logger/logger"
 	"app.yhyue.com/moapp/message/model"
 	"encoding/json"
+	"log"
 	"sync"
 )
 
@@ -32,3 +33,30 @@ func (s *Broadcast) SendToUsers(msg model.SseMessage) {
 		}
 	}
 }
+
+// 全局变量:存储所有用户的 SSE 连接
+var (
+	SseClients   = make(map[string]chan string) // 用户ID -> 消息通道
+	SseClientsMu sync.Mutex                     // 保护 clients 的互斥锁
+)
+
+// 向指定用户发送通知
+func SendNotification(userId string, msg model.SseMessage) {
+	SseClientsMu.Lock()
+	defer SseClientsMu.Unlock()
+	msgData, _ := json.Marshal(msg)
+	if ch, ok := SseClients[userId]; ok {
+		ch <- string(msgData)
+	}
+}
+
+// 向所有用户发送通知
+func SendNotificationToAll(msg model.SseMessage) {
+	SseClientsMu.Lock()
+	defer SseClientsMu.Unlock()
+	msgData, _ := json.Marshal(msg)
+	for userId, ch := range SseClients {
+		log.Println("userId:", userId)
+		ch <- string(msgData)
+	}
+}