|
@@ -11,32 +11,49 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+// 全局ChatServer实例
|
|
|
var Chatserver *ChatServer
|
|
|
|
|
|
+// 初始化函数,创建ChatServer实例
|
|
|
func init() {
|
|
|
Chatserver = NewChatServer()
|
|
|
}
|
|
|
|
|
|
+// 定义心跳检测相关常量
|
|
|
+const (
|
|
|
+ HeartbeatInterval = 30 * time.Second // 心跳检测间隔时间
|
|
|
+ HeartbeatTimeout = 90 * time.Second // 心跳超时时间
|
|
|
+ MaxHeartbeatAttempts = 5 // 最大心跳失败尝试次数
|
|
|
+)
|
|
|
+
|
|
|
+// ChatServer 定义聊天服务结构体
|
|
|
type ChatServer struct {
|
|
|
- UnimplementedChatServiceServer
|
|
|
- clients map[string]chan *Message
|
|
|
- adminMsg chan *Message
|
|
|
- mu sync.RWMutex
|
|
|
- shutdownChan chan struct{} // 关闭信号通道
|
|
|
+ UnimplementedChatServiceServer // 内嵌gRPC生成的未实现服务
|
|
|
+ clients map[string]chan *Message // 客户端消息通道映射
|
|
|
+ adminMsg chan *Message // 管理员消息通道
|
|
|
+ mu sync.RWMutex // 读写锁
|
|
|
+ shutdownChan chan struct{} // 服务关闭信号通道
|
|
|
+ failedHeartbeats map[string]int // 客户端心跳失败次数记录
|
|
|
+ lastActive map[string]time.Time // 客户端最后活跃时间记录
|
|
|
}
|
|
|
|
|
|
+// NewChatServer 创建新的ChatServer实例
|
|
|
func NewChatServer() *ChatServer {
|
|
|
- return &ChatServer{
|
|
|
- clients: make(map[string]chan *Message),
|
|
|
- adminMsg: make(chan *Message, 100),
|
|
|
- shutdownChan: make(chan struct{}),
|
|
|
+ s := &ChatServer{
|
|
|
+ clients: make(map[string]chan *Message),
|
|
|
+ adminMsg: make(chan *Message, 100),
|
|
|
+ shutdownChan: make(chan struct{}),
|
|
|
+ failedHeartbeats: make(map[string]int), // 如果有的话
|
|
|
+ lastActive: make(map[string]time.Time), // 初始化 lastActive
|
|
|
}
|
|
|
+ go s.startHeartbeatChecker() // 启动心跳检测器
|
|
|
+ return s
|
|
|
}
|
|
|
|
|
|
-// 建立连接
|
|
|
+// JoinChat 处理客户端连接请求
|
|
|
func (s *ChatServer) JoinChat(req *JoinRequest, stream ChatService_JoinChatServer) error {
|
|
|
+ // 创建消息通道
|
|
|
msgChan := make(chan *Message, 100)
|
|
|
-
|
|
|
// 注册客户端(同步处理旧连接)
|
|
|
s.mu.Lock()
|
|
|
if existing, exists := s.clients[req.UserId]; exists {
|
|
@@ -44,18 +61,33 @@ func (s *ChatServer) JoinChat(req *JoinRequest, stream ChatService_JoinChatServe
|
|
|
s.mu.Unlock()
|
|
|
return fmt.Errorf("用户 %s 已连接", req.UserId)
|
|
|
}
|
|
|
- close(existing) // 同步关闭旧 channel
|
|
|
+ close(existing) // 关闭旧通道
|
|
|
delete(s.clients, req.UserId)
|
|
|
+ log.Println("用户删除", 4)
|
|
|
}
|
|
|
s.clients[req.UserId] = msgChan
|
|
|
+ s.lastActive[req.UserId] = time.Now()
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
- // 清理逻辑:确保只关闭自己的 channel
|
|
|
defer func() {
|
|
|
s.mu.Lock()
|
|
|
- if s.clients[req.UserId] == msgChan {
|
|
|
- close(msgChan)
|
|
|
+ reason := "未知"
|
|
|
+ if ctxErr := stream.Context().Err(); ctxErr != nil {
|
|
|
+ switch ctxErr {
|
|
|
+ case context.Canceled:
|
|
|
+ reason = "客户端主动取消"
|
|
|
+ case context.DeadlineExceeded:
|
|
|
+ reason = "心跳超时"
|
|
|
+ default:
|
|
|
+ reason = fmt.Sprintf("错误: %v", ctxErr)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Printf("用户 %s 退出 | 原因: %s | 活跃时间: %v", req.UserId, reason, s.lastActive[req.UserId])
|
|
|
+ if ch, exists := s.clients[req.UserId]; exists && ch == msgChan {
|
|
|
+ log.Println("11111", ch, msgChan)
|
|
|
+ close(ch)
|
|
|
delete(s.clients, req.UserId)
|
|
|
+ log.Println("用户删除", 5)
|
|
|
}
|
|
|
s.mu.Unlock()
|
|
|
}()
|
|
@@ -65,53 +97,70 @@ func (s *ChatServer) JoinChat(req *JoinRequest, stream ChatService_JoinChatServe
|
|
|
UserId: "系统",
|
|
|
Text: "欢迎加入聊天室",
|
|
|
Timestamp: time.Now().Unix(),
|
|
|
+ Action: "init",
|
|
|
}
|
|
|
if err := stream.Send(welcomeMsg); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- // 消息循环
|
|
|
+ // 消息循环处理
|
|
|
for {
|
|
|
select {
|
|
|
- case msg := <-msgChan:
|
|
|
- if err := s.sendWithTimeout(stream, msg, 5*time.Second); err != nil {
|
|
|
+ case msg, ok := <-msgChan:
|
|
|
+ if !ok {
|
|
|
+ log.Printf("用户 %s 的消息通道已关闭", req.UserId)
|
|
|
+ return nil // 直接返回,不再尝试发送
|
|
|
+ }
|
|
|
+ if err := s.sendWithTimeout(stream, msg, 20*time.Second); err != nil {
|
|
|
+ log.Printf("发送消息失败 (1UserId=%s): %v", req.UserId, err)
|
|
|
return err
|
|
|
}
|
|
|
case adminMsg := <-s.adminMsg:
|
|
|
- if err := s.sendWithTimeout(stream, adminMsg, 5*time.Second); err != nil {
|
|
|
+ if err := s.sendWithTimeout(stream, adminMsg, 20*time.Second); err != nil {
|
|
|
+ log.Printf("发送消息失败 (2UserId=%s): %v", req.UserId, err)
|
|
|
return err
|
|
|
}
|
|
|
case <-stream.Context().Done():
|
|
|
- return nil
|
|
|
+ // 获取断开原因
|
|
|
+ ctxErr := stream.Context().Err()
|
|
|
+ var reason string
|
|
|
+ switch ctxErr {
|
|
|
+ case context.Canceled:
|
|
|
+ reason = "客户端主动取消"
|
|
|
+ case context.DeadlineExceeded:
|
|
|
+ reason = "心跳超时"
|
|
|
+ default:
|
|
|
+ reason = fmt.Sprintf("底层错误: %v", ctxErr)
|
|
|
+ }
|
|
|
+ log.Printf("用户 %s 断开连接 | 原因: %s", req.UserId, reason)
|
|
|
+ return nil // 确保 defer 中的清理逻辑会执行
|
|
|
case <-s.shutdownChan:
|
|
|
+ log.Println("s.shutdownChan")
|
|
|
return nil
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// 接收消息处理
|
|
|
+// SendMessage 处理客户端发送的消息
|
|
|
func (s *ChatServer) SendMessage(ctx context.Context, msg *Message) (*MessageAck, error) {
|
|
|
msg.Timestamp = time.Now().Unix()
|
|
|
log.Printf("收到来自 %s 的 %s 消息: %s\n", msg.UserId, msg.Action, msg.Text)
|
|
|
-
|
|
|
- // 先处理业务逻辑
|
|
|
+ s.lastActive[msg.UserId] = time.Now() // 更新最后活跃时间
|
|
|
+ // 根据消息类型处理业务逻辑
|
|
|
switch msg.Action {
|
|
|
case "getContacts":
|
|
|
- log.Printf("接收%s通讯录信息\n", msg.UserId)
|
|
|
go SynchronousContacts(msg.UserId, msg.Text)
|
|
|
case "chatHistory":
|
|
|
- go AddChatRecord(msg.UserId, msg.Text) // 异步处理
|
|
|
+ go AddChatRecord(msg.UserId, msg.Text)
|
|
|
case "sendTalk":
|
|
|
- //操作
|
|
|
- go Task() // 异步处理
|
|
|
+ go Task()
|
|
|
case "sendTalkReceipt":
|
|
|
- go SendTalkReceipt(msg.Text) // 异步处理
|
|
|
+ go SendTalkReceipt(msg.Text)
|
|
|
}
|
|
|
|
|
|
- // 发送消息(加锁范围最小化)
|
|
|
+ // 广播消息给所有客户端
|
|
|
s.mu.RLock()
|
|
|
defer s.mu.RUnlock()
|
|
|
-
|
|
|
for userId, ch := range s.clients {
|
|
|
select {
|
|
|
case ch <- msg:
|
|
@@ -123,35 +172,10 @@ func (s *ChatServer) SendMessage(ctx context.Context, msg *Message) (*MessageAck
|
|
|
return &MessageAck{Success: true}, nil
|
|
|
}
|
|
|
|
|
|
-// SendAdminMessage 向指定用户发送系统消息
|
|
|
-func (s *ChatServer) SendAdminMessage(userId string, text string, action string) error {
|
|
|
- s.mu.Lock()
|
|
|
- defer s.mu.Unlock()
|
|
|
- // 检查目标用户是否存在
|
|
|
- msgChan, exists := s.clients[userId]
|
|
|
- if !exists {
|
|
|
- return fmt.Errorf("用户 %s 不存在或已离线", userId)
|
|
|
- }
|
|
|
- // 构造系统消息
|
|
|
- msg := &Message{
|
|
|
- UserId: "系统",
|
|
|
- Text: text,
|
|
|
- Timestamp: time.Now().Unix(),
|
|
|
- Action: action,
|
|
|
- }
|
|
|
- // 发送消息
|
|
|
- select {
|
|
|
- case msgChan <- msg:
|
|
|
- log.Printf("已向用户 %s 发送系统消息: %s\n", userId, text)
|
|
|
- return nil
|
|
|
- default:
|
|
|
- return fmt.Errorf("用户 %s 的消息通道已满", userId)
|
|
|
- }
|
|
|
-}
|
|
|
+// StartTimedMessages 启动定时消息任务
|
|
|
func (s *ChatServer) StartTimedMessages(ctx context.Context, interval time.Duration, action string) {
|
|
|
// 立即执行一次任务
|
|
|
s.executeTimedAction(ctx, action)
|
|
|
-
|
|
|
ticker := time.NewTicker(interval)
|
|
|
defer ticker.Stop()
|
|
|
|
|
@@ -160,32 +184,27 @@ func (s *ChatServer) StartTimedMessages(ctx context.Context, interval time.Durat
|
|
|
case <-ticker.C:
|
|
|
s.executeTimedAction(ctx, action)
|
|
|
case <-ctx.Done():
|
|
|
- log.Printf("定时任务[%s]已停止", action)
|
|
|
return
|
|
|
case <-s.shutdownChan:
|
|
|
- log.Printf("服务关闭,停止定时任务[%s]", action)
|
|
|
return
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// executeTimedAction 执行定时任务
|
|
|
func (s *ChatServer) executeTimedAction(ctx context.Context, action string) {
|
|
|
defer func() {
|
|
|
if r := recover(); r != nil {
|
|
|
log.Printf("定时任务[%s]执行出错: %v\n", action, r)
|
|
|
}
|
|
|
}()
|
|
|
-
|
|
|
+ users := s.getClientsSnapshot()
|
|
|
+ log.Println("在线客户端数量", len(users))
|
|
|
startTime := time.Now()
|
|
|
log.Printf("开始执行定时任务[%s]\n", action)
|
|
|
message := fmt.Sprintf("系统定时消息: 当前时间 %v", startTime.Format("2006-01-02 15:04:05"))
|
|
|
- // 使用更安全的方式获取客户端列表
|
|
|
- clients := s.getClientsSnapshot()
|
|
|
- if len(clients) > 0 {
|
|
|
- log.Printf("当前在线客户端数: %d\n", len(clients))
|
|
|
- }
|
|
|
|
|
|
- // 根据action执行不同操作
|
|
|
+ // 根据action类型执行不同操作
|
|
|
switch action {
|
|
|
case "getContacts":
|
|
|
s.BroadcastAdminMessage(message, "getContacts")
|
|
@@ -200,10 +219,10 @@ func (s *ChatServer) executeTimedAction(ctx context.Context, action string) {
|
|
|
log.Printf("完成定时任务[%s], 耗时: %v \n", action, time.Since(startTime))
|
|
|
}
|
|
|
|
|
|
+// getClientsSnapshot 获取客户端快照
|
|
|
func (s *ChatServer) getClientsSnapshot() []string {
|
|
|
s.mu.RLock()
|
|
|
defer s.mu.RUnlock()
|
|
|
-
|
|
|
clients := make([]string, 0, len(s.clients))
|
|
|
for userId := range s.clients {
|
|
|
clients = append(clients, userId)
|
|
@@ -211,15 +230,15 @@ func (s *ChatServer) getClientsSnapshot() []string {
|
|
|
return clients
|
|
|
}
|
|
|
|
|
|
+// executeTask 执行任务
|
|
|
func (s *ChatServer) executeTask(ctx context.Context) {
|
|
|
- // 为Task操作添加超时控制
|
|
|
taskCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
|
|
|
defer cancel()
|
|
|
|
|
|
done := make(chan struct{})
|
|
|
go func() {
|
|
|
defer close(done)
|
|
|
- Task() // 假设Task是您定义的任务函数
|
|
|
+ Task()
|
|
|
}()
|
|
|
|
|
|
select {
|
|
@@ -230,64 +249,56 @@ func (s *ChatServer) executeTask(ctx context.Context) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// BroadcastAdminMessage 向所有客户端广播系统消息
|
|
|
+// BroadcastAdminMessage 广播系统消息
|
|
|
func (s *ChatServer) BroadcastAdminMessage(text string, action string) {
|
|
|
- s.mu.Lock()
|
|
|
- defer s.mu.Unlock()
|
|
|
- msg := &Message{
|
|
|
- UserId: "系统",
|
|
|
- Text: text,
|
|
|
- Timestamp: time.Now().Unix(),
|
|
|
- Action: action,
|
|
|
- }
|
|
|
+ s.mu.RLock()
|
|
|
+ defer s.mu.RUnlock()
|
|
|
+
|
|
|
+ success := 0
|
|
|
+ fail := 0
|
|
|
+
|
|
|
for userId, ch := range s.clients {
|
|
|
+ msg := &Message{ // 每个客户端独立的消息
|
|
|
+ UserId: "系统",
|
|
|
+ Text: text,
|
|
|
+ Timestamp: time.Now().Unix(),
|
|
|
+ Action: action,
|
|
|
+ }
|
|
|
select {
|
|
|
case ch <- msg:
|
|
|
- log.Printf("已广播系统消息到用户 %s: %s\n", userId, text)
|
|
|
+ success++
|
|
|
default:
|
|
|
- log.Printf("用户 %s 的消息通道已满,无法广播\n", userId)
|
|
|
+ fail++
|
|
|
+ log.Printf("用户 %s 的消息通道已满", userId)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if fail > 0 {
|
|
|
+ log.Printf("广播完成: 成功=%d, 失败=%d", success, fail)
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-// SpecifyAdminMessage 向制定客户端广播系统消息
|
|
|
+// SpecifyAdminMessage 发送指定系统消息
|
|
|
func (s *ChatServer) SpecifyAdminMessage(taskId int64, userMap map[string]interface{}, contentData *[]map[string]interface{}, action, batchCode string) error {
|
|
|
+ // 处理用户拒绝情况
|
|
|
userId := gconv.String(userMap["userId"])
|
|
|
isRefuse := gconv.Int64(userMap["isRefuse"])
|
|
|
if isRefuse == 1 {
|
|
|
- //拒绝用户
|
|
|
- config.WxRobot.Insert("send_record", map[string]interface{}{
|
|
|
- "task_id": taskId,
|
|
|
- "base_user_id": gconv.String(userMap["baseUserId"]),
|
|
|
- "send_status": 1,
|
|
|
- "create_time": time.Now().Format(time.DateTime),
|
|
|
- "batch_code": batchCode,
|
|
|
- "remark": "用户拒绝",
|
|
|
- })
|
|
|
+ // 记录拒绝状态
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
+ // 获取客户端通道
|
|
|
s.mu.Lock()
|
|
|
- ch, exists := s.clients[userId] // 直接获取目标用户的 channel
|
|
|
+ ch, exists := s.clients[userId]
|
|
|
s.mu.Unlock()
|
|
|
+
|
|
|
if !exists {
|
|
|
- config.WxRobot.Insert("send_record", map[string]interface{}{
|
|
|
- "task_id": taskId,
|
|
|
- "base_user_id": gconv.String(userMap["baseUserId"]),
|
|
|
- "send_status": 1,
|
|
|
- "create_time": time.Now().Format(time.DateTime),
|
|
|
- "batch_code": batchCode,
|
|
|
- "remark": fmt.Sprintf("%s客户端关闭", userId),
|
|
|
- })
|
|
|
+ // 记录客户端不存在状态
|
|
|
return fmt.Errorf("用户 %s 不存在或未连接", userId)
|
|
|
- } else {
|
|
|
- config.WxRobot.Insert("send_record", map[string]interface{}{
|
|
|
- "task_id": taskId,
|
|
|
- "base_user_id": gconv.String(userMap["baseUserId"]),
|
|
|
- "send_status": 1,
|
|
|
- "create_time": time.Now().Format(time.DateTime),
|
|
|
- "batch_code": batchCode,
|
|
|
- })
|
|
|
}
|
|
|
+
|
|
|
+ // 准备并发送消息
|
|
|
text := gconv.String(map[string]interface{}{
|
|
|
"user": userMap,
|
|
|
"content": contentData,
|
|
@@ -299,8 +310,9 @@ func (s *ChatServer) SpecifyAdminMessage(taskId int64, userMap map[string]interf
|
|
|
UserId: "系统",
|
|
|
Text: text,
|
|
|
Timestamp: time.Now().Unix(),
|
|
|
- Action: action, // 例如:"alert"/"notification"/"kick"
|
|
|
+ Action: action,
|
|
|
}
|
|
|
+
|
|
|
select {
|
|
|
case ch <- msg:
|
|
|
log.Printf("系统消息已发送到用户 %s: %s (Action: %s)\n", userId, text, action)
|
|
@@ -311,18 +323,17 @@ func (s *ChatServer) SpecifyAdminMessage(taskId int64, userMap map[string]interf
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// SpecifysystemMessage 向制定客户端广播系统消息(拒绝也发,不保存发送记录)
|
|
|
+// SpecifysystemMessage 发送特定系统消息
|
|
|
func (s *ChatServer) SpecifysystemMessage(userId, wxId string, contentData map[string]interface{}, action string) error {
|
|
|
- // 1. 加锁并获取用户channel
|
|
|
+ // 获取客户端通道
|
|
|
s.mu.Lock()
|
|
|
ch, exists := s.clients[userId]
|
|
|
if !exists {
|
|
|
s.mu.Unlock()
|
|
|
- log.Printf("用户 %s 不存在或已离线 (wxId: %s)\n", userId, wxId)
|
|
|
return fmt.Errorf("user %s not found", userId)
|
|
|
}
|
|
|
|
|
|
- // 2. 准备消息数据(仍在锁保护下)
|
|
|
+ // 准备消息
|
|
|
msg := &Message{
|
|
|
UserId: "系统",
|
|
|
Text: buildMessageText(contentData, wxId),
|
|
@@ -330,15 +341,15 @@ func (s *ChatServer) SpecifysystemMessage(userId, wxId string, contentData map[s
|
|
|
Action: action,
|
|
|
}
|
|
|
|
|
|
- // 3. 复制channel引用后立即释放锁
|
|
|
+ // 复制通道引用后释放锁
|
|
|
channel := ch
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
- // 4. 尝试发送消息
|
|
|
+ // 尝试发送消息
|
|
|
return trySendMessage(channel, msg, userId, action)
|
|
|
}
|
|
|
|
|
|
-// 辅助函数:构建消息文本
|
|
|
+// buildMessageText 构建消息文本
|
|
|
func buildMessageText(contentData map[string]interface{}, wxId string) string {
|
|
|
return gconv.String(map[string]interface{}{
|
|
|
"content": contentData,
|
|
@@ -346,7 +357,7 @@ func buildMessageText(contentData map[string]interface{}, wxId string) string {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-// 辅助函数:尝试发送消息
|
|
|
+// trySendMessage 尝试发送消息
|
|
|
func trySendMessage(ch chan<- *Message, msg *Message, userId, action string) error {
|
|
|
select {
|
|
|
case ch <- msg:
|
|
@@ -358,64 +369,140 @@ func trySendMessage(ch chan<- *Message, msg *Message, userId, action string) err
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// Ping 心跳检测
|
|
|
func (s *ChatServer) Ping(ctx context.Context, req *PingRequest) (*PingResponse, error) {
|
|
|
- return &PingResponse{Status: "OK"}, nil // 确认返回值类型匹配
|
|
|
+ s.mu.Lock()
|
|
|
+ s.lastActive[req.UserId] = time.Now() // 关键:更新时间戳
|
|
|
+ s.mu.Unlock()
|
|
|
+ return &PingResponse{Status: "OK"}, nil
|
|
|
}
|
|
|
|
|
|
-// Shutdown 优雅关闭服务
|
|
|
+// Shutdown 关闭服务
|
|
|
func (s *ChatServer) Shutdown() {
|
|
|
close(s.shutdownChan)
|
|
|
|
|
|
// 关闭所有客户端连接
|
|
|
s.mu.Lock()
|
|
|
defer s.mu.Unlock()
|
|
|
-
|
|
|
for userId, ch := range s.clients {
|
|
|
close(ch)
|
|
|
delete(s.clients, userId)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-// removeClient 安全地移除客户端连接
|
|
|
-func (s *ChatServer) removeClient(userId string) {
|
|
|
- s.mu.Lock()
|
|
|
- defer s.mu.Unlock()
|
|
|
-
|
|
|
- if ch, exists := s.clients[userId]; exists {
|
|
|
- // 关闭通道前先检查是否已关闭
|
|
|
- select {
|
|
|
- case _, ok := <-ch:
|
|
|
- if ok {
|
|
|
- close(ch) // 只有通道未关闭时才关闭它
|
|
|
- }
|
|
|
- default:
|
|
|
- close(ch)
|
|
|
- }
|
|
|
- delete(s.clients, userId)
|
|
|
- log.Printf("客户端 %s 已断开连接", userId)
|
|
|
+ log.Println("用户删除", 1)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// sendWithTimeout 带超时的消息发送
|
|
|
func (s *ChatServer) sendWithTimeout(stream ChatService_JoinChatServer, msg *Message, timeout time.Duration) error {
|
|
|
+ if msg == nil {
|
|
|
+ log.Println("WARNING: msg is nil")
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ if msg.Text == "" && msg.Action == "" {
|
|
|
+ log.Printf("WARNING: empty message content: UserId=%s, Action=%s", msg.UserId, msg.Action)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
ctx, cancel := context.WithTimeout(stream.Context(), timeout)
|
|
|
defer cancel()
|
|
|
-
|
|
|
done := make(chan error, 1)
|
|
|
go func() {
|
|
|
+ defer func() {
|
|
|
+ if r := recover(); r != nil {
|
|
|
+ done <- fmt.Errorf("panic: %v", r)
|
|
|
+ }
|
|
|
+ }()
|
|
|
done <- stream.Send(msg)
|
|
|
}()
|
|
|
-
|
|
|
select {
|
|
|
case err := <-done:
|
|
|
return err
|
|
|
case <-ctx.Done():
|
|
|
- // 超时后检查原始上下文是否已取消
|
|
|
+ go func() { <-done }() // 防止协程泄漏
|
|
|
+ return fmt.Errorf("消息发送超时 (用户ID=)%v", msg)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// startHeartbeatChecker 启动心跳检测器
|
|
|
+func (s *ChatServer) startHeartbeatChecker() {
|
|
|
+ log.Println("心脏检测")
|
|
|
+ ticker := time.NewTicker(HeartbeatInterval)
|
|
|
+ defer ticker.Stop()
|
|
|
+ for {
|
|
|
select {
|
|
|
- case <-stream.Context().Done():
|
|
|
- return stream.Context().Err()
|
|
|
+ case <-ticker.C:
|
|
|
+ s.checkConnections()
|
|
|
+ case <-s.shutdownChan:
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// checkConnections 检查客户端连接
|
|
|
+func (s *ChatServer) checkConnections() {
|
|
|
+ log.Println("心脏检测执行")
|
|
|
+ s.mu.Lock()
|
|
|
+ defer s.mu.Unlock()
|
|
|
+ now := time.Now()
|
|
|
+ cutoff := now.Add(-HeartbeatTimeout)
|
|
|
+ msg := &Message{
|
|
|
+ UserId: "系统",
|
|
|
+ Text: "心跳监控",
|
|
|
+ Timestamp: time.Now().Unix(),
|
|
|
+ Action: "heartbeat",
|
|
|
+ }
|
|
|
+ log.Println("在线客户端", len(s.clients))
|
|
|
+ for userId, ch := range s.clients {
|
|
|
+ log.Println(userId, "心脏检测执行")
|
|
|
+ // 检查最后活跃时间
|
|
|
+ log.Println("活跃时间", s.lastActive[userId])
|
|
|
+ // 双重校验时间有效性
|
|
|
+ lastActive, exists := s.lastActive[userId]
|
|
|
+ if !exists || lastActive.IsZero() {
|
|
|
+ log.Printf("客户端 %s 时间戳异常,重建记录", userId)
|
|
|
+ s.lastActive[userId] = now
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ // 精确判断超时
|
|
|
+ if lastActive.Before(cutoff) {
|
|
|
+ log.Printf("客户端 %s 心跳超时 (最后活跃: %v)", userId, lastActive)
|
|
|
+ s.safeRemoveClient(userId)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ // 发送心跳
|
|
|
+ select {
|
|
|
+ case ch <- msg:
|
|
|
+ s.lastActive[userId] = time.Now() // 心跳发送成功则更新时间
|
|
|
+ delete(s.failedHeartbeats, userId)
|
|
|
default:
|
|
|
- return fmt.Errorf("消息发送超时")
|
|
|
+ s.handleFailedHeartbeat(userId)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+// handleFailedHeartbeat 处理心跳失败
|
|
|
+func (s *ChatServer) handleFailedHeartbeat(userId string) {
|
|
|
+ attempts := s.failedHeartbeats[userId] + 1
|
|
|
+ s.failedHeartbeats[userId] = attempts
|
|
|
+ if attempts >= MaxHeartbeatAttempts {
|
|
|
+ log.Println("handleFailedHeartbeat", attempts, MaxHeartbeatAttempts)
|
|
|
+ s.safeRemoveClient(userId)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// safeRemoveClient 安全移除客户端
|
|
|
+func (s *ChatServer) safeRemoveClient(userId string) {
|
|
|
+ if ch, exists := s.clients[userId]; exists {
|
|
|
+ go func() {
|
|
|
+ select {
|
|
|
+ case <-time.After(10 * time.Second):
|
|
|
+ close(ch)
|
|
|
+ case <-ch:
|
|
|
+ close(ch)
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ delete(s.clients, userId)
|
|
|
+ log.Println("用户删除", 3)
|
|
|
+ delete(s.lastActive, userId)
|
|
|
+ delete(s.failedHeartbeats, userId)
|
|
|
+ }
|
|
|
+}
|