wangshan 3 tháng trước cách đây
mục cha
commit
56917cd955
7 tập tin đã thay đổi với 135 bổ sung42 xóa
  1. 0 5
      entity/entity.go
  2. 1 0
      etc/config.yaml
  3. 44 0
      handler/activity/lotteryDrawTask.go
  4. 2 0
      handler/handler.go
  5. 10 5
      model/sse.go
  6. 44 32
      services/sse/sse.go
  7. 34 0
      util/sse.go

+ 0 - 5
entity/entity.go

@@ -1,5 +0,0 @@
-package entity
-
-var (
-	SseClients = map[int64]chan string{}
-)

+ 1 - 0
etc/config.yaml

@@ -171,4 +171,5 @@ newRegister: #新注册送7天超级订阅
     pcUrl: /page_workDesktop/work-bench/app/big/big_subscribe?vt=v
     mobileUrl: /jyapp/vipsubscribe/toSubVipSetPage
     wxUrl: /front/vipsubscribe/toSubVipSetPage
+cookieDomain: .jydev.jianyu360.com
 webhookURL: ["https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=24b0ac60-3a02-441f-842e-9cd3f75d1208"]

+ 44 - 0
handler/activity/lotteryDrawTask.go

@@ -10,6 +10,7 @@ import (
 	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/os/gctx"
+	"regexp"
 	"time"
 )
 
@@ -174,3 +175,46 @@ func LotteryDrawTask(msg *model.Message) {
 		}
 	}
 }
+
+type lotteryBody struct {
+	NickName   string
+	Phone      string
+	MgoUserId  string
+	PositionId int64
+	ActiveId   int64
+	ActiveName string
+	PrizeId    int64
+	PrizeName  string
+}
+
+// WinningLottery 中奖信息
+func WinningLottery(msg *model.Message) {
+	//TODO  sse 通知用户 中奖信息
+	var lb lotteryBody
+	if msg.E_body != nil {
+		b, err := json.Marshal(msg.E_body)
+		if err == nil {
+			err = json.Unmarshal(b, &lb)
+		}
+		if err != nil {
+			logger.Info(fmt.Sprintf("中奖参数信息有误:%s", msg.E_userId))
+			return
+		}
+	}
+	var userName = lb.NickName
+	if lb.Phone != "" {
+		var PhoneReg = regexp.MustCompile(`^(100\d{8}|1[3-9]\d{9})$`)
+		if PhoneReg.MatchString(lb.Phone) {
+			phone := []rune(lb.Phone)
+			userName = string(phone[0:3]) + "****" + string(phone[(len(phone)-4):])
+		}
+	}
+	logger.Info("  sse  中奖信息通知:", msg)
+	util.SseBroadcast.SendToUsers(model.SseMessage{
+		Prize:  lb.PrizeName,
+		User:   userName,
+		State:  model.AllTarget,
+		Time:   time.Now().Format(date.Date_Full_Layout),
+		Remark: fmt.Sprintf("恭喜 %s 抽中 %s ", userName, lb.PrizeName),
+	})
+}

+ 2 - 0
handler/handler.go

@@ -35,6 +35,8 @@ var (
 		"task": activity.Task,
 		//抽奖 任务
 		"lottery_draw_task": activity.LotteryDrawTask,
+		//中奖信息
+		"winning_lottery": activity.WinningLottery,
 	}
 )
 

+ 10 - 5
model/sse.go

@@ -1,9 +1,14 @@
 package model
 
-// 消息结构体,用于传递 sender、message 和时间
+const (
+	AllTarget = "JYAllVisitor"
+)
+
+// 消息结构体,用于传递 中奖信息
 type SseMessage struct {
-	Prize string `json:"prize"` //奖品信息 例如:腾讯视频会员周卡
-	User  string `json:"user"`  //中奖人信息 例如:157****0152
-	State int    `json:"state"` //0:当前用户中奖;1:其他用户中奖
-	Time  string `json:"time"`
+	Prize  string `json:"prize"`  //奖品信息 例如:腾讯视频会员周卡
+	User   string `json:"user"`   //中奖人信息 例如:157****0152
+	State  string `json:"state"`  // AllTarget
+	Time   string `json:"time"`   //
+	Remark string `json:"remark"` //
 }

+ 44 - 32
services/sse/sse.go

@@ -2,19 +2,38 @@ package sse
 
 import (
 	"app.yhyue.com/moapp/jybase/common"
+	"app.yhyue.com/moapp/jybase/date"
 	"app.yhyue.com/moapp/jybase/go-logger/logger"
 	"app.yhyue.com/moapp/jybase/go-xweb/xweb"
-	"app.yhyue.com/moapp/message/entity"
 	"app.yhyue.com/moapp/message/model"
-	"encoding/json"
+	"app.yhyue.com/moapp/message/util"
 	"fmt"
 	"net/http"
+	"strings"
 	"time"
 )
 
 type ServerSentRouter struct {
 	*xweb.Action
 	events xweb.Mapper `xweb:"/sse/events"`
+	send   xweb.Mapper `xweb:"/sse/send"`
+}
+
+func (s *ServerSentRouter) Send() {
+	msg := s.GetString("msg")
+	phone := s.GetString("phone")
+	if msg == "" {
+		msg = "Default notification at " + time.Now().Format(time.RFC3339)
+	} else {
+		util.SseBroadcast.SendToUsers(model.SseMessage{
+			Prize:  msg,
+			User:   phone,
+			State:  model.AllTarget,
+			Time:   time.Now().Format(date.Date_Full_Layout),
+			Remark: fmt.Sprintf("测试信息:恭喜 %s 抽中 %s ", phone, msg),
+		})
+	}
+	fmt.Fprintf(s.ResponseWriter, "Broadcasted: %s", msg)
 }
 
 func (s *ServerSentRouter) Events() {
@@ -25,45 +44,38 @@ func (s *ServerSentRouter) Events() {
 	s.ResponseWriter.Header().Set("Access-Control-Allow-Origin", "*")
 
 	sessVal := s.Session().GetMultiple()
-	if userId := common.ObjToString(sessVal["userId"]); userId != "" {
-		positionId := common.Int64All(sessVal["positionId"])
-		clientChan := make(chan string, 1)
-		entity.SseClients[positionId] = clientChan
+	userId := common.ObjToString(sessVal["userId"])
+	sessionId := s.Session().Id()
+	if sessionId != "" {
+		clientChan := make(chan string, 10)
+		util.SseBroadcast.Mu.Lock()
+		util.SseBroadcast.Clients[clientChan] = struct{}{}
+		util.SseBroadcast.Mu.Unlock()
 		defer func() {
-			delete(entity.SseClients, positionId)
+			util.SseBroadcast.Mu.Lock()
+			delete(util.SseBroadcast.Clients, clientChan)
+			util.SseBroadcast.Mu.Unlock()
 			close(clientChan)
+			logger.Info("Client disconnected")
 		}()
-		//go SendToClient(positionId)
+		flusher := s.ResponseWriter.(http.Flusher)
 		for {
 			select {
 			case msg := <-clientChan:
-				logger.Info("msg:", msg, "---------------------------")
-				fmt.Fprintf(s.ResponseWriter, "data: %s\n\n", msg)
-				s.ResponseWriter.(http.Flusher).Flush()
+				logger.Info("--msg--:", msg, "--userId--:", userId)
+				var isSend bool
+				if strings.Contains(msg, model.AllTarget) {
+					isSend = true
+				} else if userId != "" {
+					isSend = true
+				}
+				if isSend {
+					fmt.Fprintf(s.ResponseWriter, "data: %s\n\n", msg)
+					flusher.Flush()
+				}
 			case <-s.Request.Context().Done():
 				return
 			}
 		}
 	}
 }
-
-func SendToClient(positionId int64) {
-	clientChan, exists := entity.SseClients[positionId]
-	if !exists {
-		logger.Info("Client not found: %s", positionId)
-		return
-	}
-	for i := 0; i < 10; i++ {
-		message := fmt.Sprintf("这是发送内容 第 %d 条", i)
-		msg := model.SseMessage{User: "剑鱼", Prize: message, Time: time.Now().Format("15:04:05")}
-		msgData, _ := json.Marshal(msg)
-		select {
-		case clientChan <- string(msgData):
-			logger.Info(fmt.Sprintf("Sent to client %s: %s", positionId, message))
-		default:
-			logger.Info(fmt.Sprintf("Channel full for client %s, message dropped: %s", positionId, message))
-			return
-		}
-		time.Sleep(2 * time.Second)
-	}
-}

+ 34 - 0
util/sse.go

@@ -0,0 +1,34 @@
+package util
+
+import (
+	"app.yhyue.com/moapp/jybase/go-logger/logger"
+	"app.yhyue.com/moapp/message/model"
+	"encoding/json"
+	"sync"
+)
+
+type Broadcast struct {
+	Clients map[chan string]struct{}
+	Mu      sync.Mutex
+}
+
+var (
+	SseBroadcast = &Broadcast{
+		Clients: make(map[chan string]struct{}),
+	}
+)
+
+// SendToUsers 消息通知
+func (s *Broadcast) SendToUsers(msg model.SseMessage) {
+	s.Mu.Lock()
+	defer s.Mu.Unlock()
+	msgData, _ := json.Marshal(msg)
+	for clientChan := range s.Clients {
+		select {
+		case clientChan <- string(msgData):
+			logger.Info("send to all user, msg:", msg)
+		default:
+			logger.Info("Client channel full, skipping")
+		}
+	}
+}