Bläddra i källkod

时间间隔修改

WH01243 1 vecka sedan
förälder
incheckning
58b401d18c
2 ändrade filer med 85 tillägg och 34 borttagningar
  1. 7 5
      client/service/chatClient.go
  2. 78 29
      rpc/service/chatServer.go

+ 7 - 5
client/service/chatClient.go

@@ -148,7 +148,7 @@ func (c *ChatClient) connect(password string) error {
 }
 
 // disconnect 断开与服务器的连接
-func (c *ChatClient) disconnect() {
+func (c *ChatClient) disconnect(fool bool) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
 
@@ -177,6 +177,9 @@ func (c *ChatClient) disconnect() {
 	// 更新连接状态
 	c.isConnected = false
 	log.Printf("[连接][用户:%s] 已断开连接", c.userID)
+	if fool {
+		go c.reconnect()
+	}
 }
 
 // closeStream 关闭gRPC流
@@ -228,7 +231,7 @@ func (c *ChatClient) reconnect() {
 			return
 		default:
 			// 1. 清理旧连接
-			c.disconnect()
+			c.disconnect(false)
 
 			// 2. 尝试新连接
 			log.Printf("[重连] 尝试第 %d/%d 次重连", currentRetry+1, maxRetries)
@@ -509,8 +512,7 @@ func (c *ChatClient) startHealthCheck() {
 
 				if err != nil {
 					log.Printf("健康检查失败: %v", err)
-					c.disconnect()
-					go c.reconnect()
+					c.disconnect(true)
 				} else {
 					c.mu.Lock()
 					c.lastPingTime = time.Now() // 更新最后心跳时间
@@ -545,7 +547,7 @@ func (c *ChatClient) Shutdown() {
 	c.cancel()
 
 	// 2. 断开连接
-	c.disconnect()
+	c.disconnect(false)
 
 	// 3. 等待所有goroutine结束
 	c.wg.Wait()

+ 78 - 29
rpc/service/chatServer.go

@@ -2,6 +2,7 @@ package service
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"github.com/gogf/gf/v2/util/gconv"
 	"log"
@@ -278,48 +279,89 @@ func (s *ChatServer) BroadcastAdminMessage(text string, action string) {
 	}
 }
 
-// SpecifyAdminMessage 发送指定系统消息
-func (s *ChatServer) SpecifyAdminMessage(taskId int64, userMap map[string]interface{}, contentData *[]map[string]interface{}, action, batchCode string) error {
-	// 处理用户拒绝情况
+// SpecifyAdminMessage 指定用户发送系统消息
+func (s *ChatServer) SpecifyAdminMessage(
+	taskId int64,
+	userMap map[string]interface{},
+	contentData *[]map[string]interface{},
+	action, batchCode string,
+) error {
+	// 1. 检查用户是否拒绝
 	userId := gconv.String(userMap["userId"])
-	isRefuse := gconv.Int64(userMap["isRefuse"])
-	if isRefuse == 1 {
-		// 记录拒绝状态
+	if gconv.Int64(userMap["isRefuse"]) == 1 {
+		//拒绝用户
+		config.WxRobot.Insert("send_record", map[string]interface{}{
+			"task_id":      taskId,
+			"base_user_id": gconv.String(userMap["baseUserId"]),
+			"send_status":  3,
+			"create_time":  time.Now().Format(time.DateTime),
+			"batch_code":   batchCode,
+			"remark":       "用户拒绝",
+		})
+		log.Printf("用户 %s 拒绝接收消息 (TaskID: %d, Batch: %s)", userId, taskId, batchCode)
 		return nil
 	}
 
-	// 获取客户端通道
+	// 2. 序列化消息内容(提前失败)
+	msgData := map[string]interface{}{
+		"user":          userMap,
+		"content":       contentData,
+		"taskId":        taskId,
+		"batchCode":     batchCode,
+		"replyLanguage": config.DbConf.ReplyLanguage,
+	}
+	text, err := json.Marshal(msgData) // 使用标准库替代 gconv
+	if err != nil {
+		return fmt.Errorf("序列化消息失败: %v (UserID: %s)", err, userId)
+	}
+
+	// 3. 获取客户端通道(最小化锁范围)
 	s.mu.Lock()
 	ch, exists := s.clients[userId]
 	s.mu.Unlock()
 
 	if !exists {
-		// 记录客户端不存在状态
-		return fmt.Errorf("用户 %s 不存在或未连接", userId)
+		config.WxRobot.Insert("send_record", map[string]interface{}{
+			"task_id":      taskId,
+			"base_user_id": gconv.String(userMap["baseUserId"]),
+			"send_status":  1,
+			"create_time":  time.Now().Format(time.DateTime),
+			"batch_code":   batchCode,
+			"remark":       fmt.Sprintf("%s客户端关闭", userId),
+		})
+		return fmt.Errorf("用户 %s 未连接 (TaskID: %d)", userId, taskId)
+	} else {
+		config.WxRobot.Insert("send_record", map[string]interface{}{
+			"task_id":      taskId,
+			"base_user_id": gconv.String(userMap["baseUserId"]),
+			"send_status":  1,
+			"create_time":  time.Now().Format(time.DateTime),
+			"batch_code":   batchCode,
+			"remark":       "等待客户端回调",
+		})
 	}
 
-	// 准备并发送消息
-	text := gconv.String(map[string]interface{}{
-		"user":          userMap,
-		"content":       contentData,
-		"taskId":        taskId,
-		"batchCode":     batchCode,
-		"replyLanguage": config.DbConf.ReplyLanguage,
-	})
+	// 4. 构造消息
 	msg := &Message{
 		UserId:    "系统",
-		Text:      text,
+		Text:      string(text),
 		Timestamp: time.Now().Unix(),
 		Action:    action,
 	}
 
+	// 5. 尝试发送(带超时和重试)
+	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+	defer cancel()
+
 	select {
 	case ch <- msg:
-		log.Printf("系统消息已发送到用户 %s: %s (Action: %s)\n", userId, text, action)
+		log.Printf("系统消息发送成功 | User: %s | Task: %d | Batch: %s | Action: %s",
+			userId, taskId, batchCode, action)
 		return nil
-	default:
-		log.Printf("用户 %s 的消息通道已满,丢弃消息\n", userId)
-		return fmt.Errorf("用户 %s 的消息通道阻塞", userId)
+	case <-ctx.Done():
+		log.Printf("系统消息发送超时 | User: %s | Task: %d | 通道可能已满或处理阻塞",
+			userId, taskId)
+		return fmt.Errorf("消息发送超时 (UserID: %s)", userId)
 	}
 }
 
@@ -328,9 +370,10 @@ func (s *ChatServer) SpecifysystemMessage(userId, wxId string, contentData map[s
 	// 获取客户端通道
 	s.mu.Lock()
 	ch, exists := s.clients[userId]
+	s.mu.Unlock() // 尽早释放锁
+
 	if !exists {
-		s.mu.Unlock()
-		return fmt.Errorf("user %s not found", userId)
+		return fmt.Errorf("user %s not connected (wxId: %s, action: %s)", userId, wxId, action)
 	}
 
 	// 准备消息
@@ -341,12 +384,18 @@ func (s *ChatServer) SpecifysystemMessage(userId, wxId string, contentData map[s
 		Action:    action,
 	}
 
-	// 复制通道引用后释放锁
-	channel := ch
-	s.mu.Unlock()
+	// 尝试发送消息(带超时)
+	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+	defer cancel()
 
-	// 尝试发送消息
-	return trySendMessage(channel, msg, userId, action)
+	select {
+	case ch <- msg:
+		log.Printf("system message sent | user: %s | wxId: %s | action: %s", userId, wxId, action)
+		return nil
+	case <-ctx.Done():
+		log.Printf("system message send timeout | user: %s | wxId: %s | action: %s", userId, wxId, action)
+		return fmt.Errorf("send timeout to user %s", userId)
+	}
 }
 
 // buildMessageText 构建消息文本