123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- package util
- import (
- "app.yhyue.com/moapp/jybase/go-logger/logger"
- "app.yhyue.com/moapp/jybase/redis"
- "app.yhyue.com/moapp/message/model"
- "encoding/json"
- "fmt"
- "strings"
- "sync"
- )
- type Broadcast struct {
- Clients map[chan string]struct{}
- Mu sync.Mutex
- }
- var (
- SseBroadcast = &Broadcast{
- Clients: make(map[chan string]struct{}),
- }
- )
- // SendToUsers 消息通知
- func (s *Broadcast) SendToUsers(msg model.SseMessage) {
- s.Mu.Lock()
- defer s.Mu.Unlock()
- msgData, _ := json.Marshal(msg)
- for clientChan := range s.Clients {
- select {
- case clientChan <- string(msgData):
- logger.Info("send to all user, msg:", msg)
- default:
- logger.Info("Client channel full, skipping")
- }
- }
- }
- // 全局变量:存储所有用户的 SSE 连接
- var (
- SseClients = make(map[string]chan string) // 用户ID -> 消息通道
- SseClientsMu sync.Mutex // 保护 clients 的互斥锁
- )
- // 向指定用户发送通知
- func SendNotification(userId string, msg model.SseMessage) {
- //logger.Info(userId, "----SendNotification-----SseClients:", SseClients)
- SseClientsMu.Lock()
- defer SseClientsMu.Unlock()
- msgData, _ := json.Marshal(msg)
- if ch, ok := SseClients[userId]; ok {
- ch <- string(msgData)
- }
- }
- // 向所有用户发送通知
- func SendNotificationToAll(msg model.SseMessage) {
- //logger.Info("SendNotificationToAll ----- SseClients:", SseClients)
- SseClientsMu.Lock()
- defer SseClientsMu.Unlock()
- msgData, _ := json.Marshal(msg)
- for userId, ch := range SseClients {
- logger.Info("to all ; userId:", userId)
- ch <- string(msgData)
- }
- }
- const (
- sseRedisKey = "sse_cache_%s"
- powerCacheDb = "newother"
- expiresTime = 24 * 60 * 60
- )
- func SetSseRedisCache(userId, keys string) {
- cacheKey := fmt.Sprintf(sseRedisKey, userId)
- cacheData := redis.GetStr(powerCacheDb, cacheKey)
- var caches []string
- if cacheData == "" {
- caches = append(caches, userId, keys)
- } else {
- caches = append(strings.Split(cacheData, ","), keys)
- }
- redis.Put(powerCacheDb, cacheKey, strings.Join(caches, ","), expiresTime)
- }
- func GetSseRedisCache(userId string) (keys []string) {
- cacheKey := fmt.Sprintf(sseRedisKey, userId)
- cacheData := redis.GetStr(powerCacheDb, cacheKey)
- return strings.Split(cacheData, ",")
- }
|