package service import ( "client/config" "context" "fmt" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "log" "math" "os" "os/signal" "sync" "syscall" "time" . "client/chat" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) // 定义连接相关的常量参数 const ( initialReconnectInterval = 2 * time.Second // 初始重连间隔时间 keepaliveTime = 20 * time.Second // 客户端keepalive心跳间隔 keepaliveTimeout = 10 * time.Second // keepalive心跳超时时间 connectionTimeout = 10 * time.Second // 连接超时时间 maxReconnectInterval = 120 * time.Second // 最大重连间隔时间 healthCheckInterval = 30 * time.Second // 健康检查间隔时间 ) // 全局客户端实例 var client = &ChatClient{} // ChatClient 定义gRPC客户端结构体 type ChatClient struct { conn *grpc.ClientConn // gRPC连接对象 client ChatServiceClient // gRPC服务客户端 ctx context.Context // 上下文对象 cancel context.CancelFunc // 取消函数 userID string // 用户ID mu sync.RWMutex // 读写锁(保护并发访问) retryCount int // 当前重试次数 isConnected bool // 连接状态标志 wg sync.WaitGroup // 等待组(用于goroutine同步) reconnecting bool // 重连状态标志 serviceAddress string // 服务端地址 stream ChatService_JoinChatClient // gRPC流对象 streamMutex sync.Mutex // 流操作互斥锁 healthCheckTicker *time.Ticker // 健康检查定时器 lastPingTime time.Time // 最后心跳时间 } // NewChatClient 构造函数,创建新的ChatClient实例 func NewChatClient(userID, address string) *ChatClient { // 创建可取消的上下文 ctx, cancel := context.WithCancel(context.Background()) return &ChatClient{ userID: userID, // 设置用户ID ctx: ctx, // 设置上下文 cancel: cancel, // 设置取消函数 serviceAddress: address, // 设置服务端地址 } } // connect 连接到gRPC服务器 func (c *ChatClient) connect(password string) error { c.mu.Lock() defer c.mu.Unlock() // 检查现有连接是否可用 if c.isConnected && c.conn.GetState() == connectivity.Ready { return nil } // 打印连接日志 log.Println("[连接] 尝试连接服务器...", c.serviceAddress) // 创建带超时的上下文 ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) defer cancel() // 建立gRPC连接 conn, err := grpc.DialContext(ctx, c.serviceAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), // 使用非安全连接 grpc.WithBlock(), // 阻塞式连接 grpc.WithPerRPCCredentials(&authCreds{password: password}), // 设置认证凭证 grpc.WithDefaultCallOptions( // 设置默认调用选项 grpc.MaxCallRecvMsgSize(20*1024*1024), // 最大接收消息大小 grpc.MaxCallSendMsgSize(20*1024*1024), // 最大发送消息大小 ), grpc.WithKeepaliveParams(keepalive.ClientParameters{ // 设置keepalive参数 Time: keepaliveTime, Timeout: keepaliveTimeout, PermitWithoutStream: true, // 允许无活动流时也发送心跳 }), ) if err != nil { return fmt.Errorf("连接失败: %v", err) } // 检查连接状态 state := conn.GetState() if state != connectivity.Ready { if conn.WaitForStateChange(ctx, connectivity.Connecting) { state := conn.GetState() if state != connectivity.Ready { _ = conn.Close() return fmt.Errorf("连接未就绪,状态: %v", state) } } } // 创建客户端并测试连接 client := NewChatServiceClient(conn) _, err = client.JoinChat(context.Background(), &JoinRequest{UserId: c.userID, Force: true}) if err != nil { _ = conn.Close() return fmt.Errorf("连接测试失败: %v", err) } // 关闭旧连接(如果存在) if c.conn != nil { _ = c.conn.Close() } // 更新连接状态 c.conn = conn c.client = client c.isConnected = true c.retryCount = 0 c.lastPingTime = time.Now() // 启动健康检查(延迟10秒) if c.healthCheckTicker != nil { c.healthCheckTicker.Stop() c.healthCheckTicker = nil } go func() { time.Sleep(10 * time.Second) // 给连接稳定时间 c.startHealthCheck() }() log.Printf("[连接][用户:%s] 服务器连接成功", c.userID) return nil } // disconnect 断开与服务器的连接 func (c *ChatClient) disconnect() { c.mu.Lock() defer c.mu.Unlock() // 如果已经断开连接,直接返回 if !c.isConnected { return } // 停止健康检查定时器 if c.healthCheckTicker != nil { c.healthCheckTicker.Stop() c.healthCheckTicker = nil } // 关闭流 c.closeStream() // 关闭连接 if c.conn != nil { if err := c.conn.Close(); err != nil { log.Printf("[连接][用户:%s] 关闭连接出错: %v", c.userID, err) } c.conn = nil } // 更新连接状态 c.isConnected = false log.Printf("[连接][用户:%s] 已断开连接", c.userID) } // closeStream 关闭gRPC流 func (c *ChatClient) closeStream() { c.streamMutex.Lock() defer c.streamMutex.Unlock() // 如果流存在,则关闭 if c.stream != nil { if err := c.stream.CloseSend(); err != nil { log.Printf("[流] 关闭流错误: %v", err) } c.stream = nil } } // reconnect 执行重连逻辑 func (c *ChatClient) reconnect() { const maxRetries = 500 // 最大重试次数 // 加锁检查重连状态 c.mu.Lock() if c.reconnecting { c.mu.Unlock() log.Printf("[重连] 已在重连中,跳过") return } c.reconnecting = true currentRetry := c.retryCount c.mu.Unlock() // 确保结束时重置重连状态 defer func() { c.mu.Lock() c.reconnecting = false c.mu.Unlock() log.Printf("[重连] 重连流程结束") }() // 等待组管理 c.wg.Add(1) defer c.wg.Done() // 重连循环 for currentRetry < maxRetries { select { case <-c.ctx.Done(): log.Printf("[重连] 上下文取消,终止重连") return default: // 1. 清理旧连接 c.disconnect() // 2. 尝试新连接 log.Printf("[重连] 尝试第 %d/%d 次重连", currentRetry+1, maxRetries) err := c.connect(config.Cfg.Password) if err == nil { log.Printf("[重连] 成功建立新连接") go c.establishStream() return } // 3. 错误处理 log.Printf("[重连] 第 %d 次重连失败: %v", currentRetry+1, err) if isFatalError(err) { log.Printf("[重连] 遇到致命错误,停止重连: %v", err) return } // 4. 退避等待 backoff := calculateBackoff(currentRetry) log.Printf("[重连] 等待 %v 后重试", backoff) select { case <-time.After(backoff): currentRetry++ case <-c.ctx.Done(): return } } } log.Printf("[重连] 已达到最大重试次数 (%d),停止重连", maxRetries) } // 辅助函数:判断是否为致命错误(如认证失败、无效地址) func isFatalError(err error) bool { if err == nil { return false } // 示例:gRPC 的不可恢复错误码 if status, ok := status.FromError(err); ok { switch status.Code() { case codes.Unauthenticated, codes.PermissionDenied, codes.NotFound: return true } } return false } // calculateBackoff 计算退避时间(指数退避算法) func calculateBackoff(retryCount int) time.Duration { base := float64(initialReconnectInterval) max := float64(maxReconnectInterval) backoff := base * math.Pow(1.5, float64(retryCount)) return time.Duration(math.Min(backoff, max)) } // establishStream 建立gRPC流 func (c *ChatClient) establishStream() { // 添加等待组计数 c.wg.Add(1) defer c.wg.Done() retryDelay := time.Second for { select { case <-c.ctx.Done(): // 检查是否被取消 return default: // 检查连接是否就绪 if !c.isReady() { time.Sleep(retryDelay) retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval))) continue } // 尝试建立流 stream, err := func() (ChatService_JoinChatClient, error) { c.streamMutex.Lock() defer c.streamMutex.Unlock() return c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID, Force: true}) }() if err != nil { log.Printf("[流] 建立流失败: %v,等待重试...", err) time.Sleep(retryDelay) retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval))) continue } // 重置重试延迟 retryDelay = time.Second // 处理接收到的消息 if err := c.receiveMessages(stream); err != nil { log.Printf("[流] 接收消息错误: %v", err) return } } } } // receiveMessages 接收并处理消息 func (c *ChatClient) receiveMessages(stream ChatService_JoinChatClient) error { for { // 接收消息 msg, err := stream.Recv() if msg == nil { continue } // 处理错误 if err != nil { // 区分错误类型 st, ok := status.FromError(err) if ok { switch st.Code() { case codes.Canceled: log.Println("流正常关闭") return nil case codes.Unavailable, codes.DeadlineExceeded: log.Printf("流异常断开,触发重连: %v", err) go c.reconnect() // 触发重连 return err default: log.Printf("不可恢复错误: %v", err) return err } } // 非gRPC错误(如网络问题) log.Printf("[流] 接收错误: %v,触发重连", err) go c.reconnect() return err } // 跳过空消息 if msg == nil || msg.Text == "" || msg.UserId == "" { continue } // 再次检查错误(冗余检查) if err != nil { if status.Code(err) == codes.Canceled { return nil } return fmt.Errorf("接收消息错误: %v", err) } // 打印收到的消息 log.Printf("[接收] 收到消息: %+v", msg) // 处理系统消息 if msg.UserId == "系统" { switch msg.Action { case "sendTalk": go SendTalk(msg.Text) case "getContacts": go GetContacts() case "reject": go Reject(msg.Text) } if msg.Text == "欢迎加入聊天室" { go GetContacts() } } } } // SendMessage 发送消息 func (c *ChatClient) SendMessage(text, action string) error { c.mu.RLock() defer c.mu.RUnlock() // 检查连接是否就绪 if !c.isReady() { go c.reconnect() // 触发重连 return fmt.Errorf("未连接服务器,正在尝试重连...") } // 构造消息 msg := &Message{ UserId: c.userID, Text: text, Action: action, } log.Printf("[发送] 发送消息: %+v", msg) // 创建带超时的上下文 ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) defer cancel() // 发送消息 aaa, err := c.client.SendMessage(ctx, msg) log.Println(aaa) if err != nil { // 处理不同类型的错误 st, ok := status.FromError(err) if ok { switch st.Code() { case codes.Unavailable, codes.DeadlineExceeded: go c.reconnect() // 网络问题触发重连 case codes.Unauthenticated, codes.PermissionDenied: // 认证错误不重连 } } return fmt.Errorf("发送失败: %v", err) } return nil } // ConnectGRPC 启动gRPC客户端 func ConnectGRPC(userId, address string) { log.Println("[主程序] 启动GRPC连接") // 创建新客户端 client = NewChatClient(userId, address) defer client.Shutdown() // 确保退出时关闭 // 启动连接监控 go client.startConnectionMonitor() // 初始连接 if err := client.connect(config.Cfg.Password); err != nil { log.Printf("[主程序] 初始连接失败: %v", err) go client.reconnect() } else { go client.establishStream() } // 等待退出信号 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit } // isReady 检查连接是否就绪 func (c *ChatClient) isReady() bool { c.mu.RLock() defer c.mu.RUnlock() // 基本检查 if !c.isConnected || c.conn == nil { return false } // 检查连接状态 state := c.conn.GetState() if state != connectivity.Ready { return false } // 检查心跳时间 log.Println(time.Since(c.lastPingTime), 3*keepaliveTime) if time.Since(c.lastPingTime) > 3*keepaliveTime { return false } return true } // startHealthCheck 启动健康检查 func (c *ChatClient) startHealthCheck() { c.healthCheckTicker = time.NewTicker(healthCheckInterval) c.wg.Add(1) go func() { defer c.wg.Done() for { select { case <-c.healthCheckTicker.C: // 定时触发 if !c.isReady() { log.Printf("连接不可用,触发重连") go c.reconnect() continue } // 执行健康检查 ctx, cancel := context.WithTimeout(c.ctx, 6*time.Second) _, err := c.client.Ping(ctx, &PingRequest{UserId: c.userID}) cancel() if err != nil { log.Printf("健康检查失败: %v", err) c.disconnect() go c.reconnect() } else { c.mu.Lock() c.lastPingTime = time.Now() // 更新最后心跳时间 c.mu.Unlock() } case <-c.ctx.Done(): // 上下文取消 return } } }() } // checkHealth 执行健康检查 func (c *ChatClient) checkHealth() error { ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) defer cancel() // 使用JoinChat方法检查健康状态 _, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID, Force: false}) if err != nil { return fmt.Errorf("健康检查失败: %w", err) } return nil } // Shutdown 关闭客户端 func (c *ChatClient) Shutdown() { log.Println("客户端服务关闭") // 1. 取消上下文 c.cancel() // 2. 断开连接 c.disconnect() // 3. 等待所有goroutine结束 c.wg.Wait() } // authCreds 实现gRPC认证接口 type authCreds struct { password string } // GetRequestMetadata 获取认证元数据 func (c *authCreds) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) { return map[string]string{ "password": c.password, }, nil } // RequireTransportSecurity 是否要求传输安全 func (c *authCreds) RequireTransportSecurity() bool { return false // 不要求TLS } // startConnectionMonitor 启动连接监控 func (c *ChatClient) startConnectionMonitor() { c.wg.Add(1) go func() { defer c.wg.Done() ticker := time.NewTicker(1 * time.Minute) // 每5分钟检查一次 defer ticker.Stop() for { select { case <-ticker.C: // 定时触发 c.mu.RLock() conn := c.conn c.mu.RUnlock() if conn == nil { log.Printf("[监控] 连接不存在,触发重连") go c.reconnect() continue } // 检查连接状态 state := conn.GetState() log.Printf("[监控] 当前连接状态: %v", state) // 判断是否需要重连 if state == connectivity.TransientFailure || state == connectivity.Shutdown || (state == connectivity.Ready && time.Since(c.lastPingTime) > 3*keepaliveTime) { log.Printf("[监控] 连接异常,触发重连") go c.reconnect() } case <-c.ctx.Done(): // 上下文取消 log.Printf("[监控] 监控停止") return } } }() }