|
- package service
- import (
- "client/config"
- "context"
- "fmt"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/credentials/insecure"
- "log"
- "math"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "time"
- . "client/chat"
- "google.golang.org/grpc"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/status"
- )
- // 定义连接相关的常量参数
- const (
- initialReconnectInterval = 2 * time.Second // 初始重连间隔时间
- keepaliveTime = 20 * time.Second // 客户端keepalive心跳间隔
- keepaliveTimeout = 10 * time.Second // keepalive心跳超时时间
- connectionTimeout = 10 * time.Second // 连接超时时间
- maxReconnectInterval = 120 * time.Second // 最大重连间隔时间
- healthCheckInterval = 30 * time.Second // 健康检查间隔时间
- )
- // 全局客户端实例
- var client = &ChatClient{}
- // ChatClient 定义gRPC客户端结构体
- type ChatClient struct {
- conn *grpc.ClientConn // gRPC连接对象
- client ChatServiceClient // gRPC服务客户端
- ctx context.Context // 上下文对象
- cancel context.CancelFunc // 取消函数
- userID string // 用户ID
- mu sync.RWMutex // 读写锁(保护并发访问)
- retryCount int // 当前重试次数
- isConnected bool // 连接状态标志
- wg sync.WaitGroup // 等待组(用于goroutine同步)
- reconnecting bool // 重连状态标志
- serviceAddress string // 服务端地址
- stream ChatService_JoinChatClient // gRPC流对象
- streamMutex sync.Mutex // 流操作互斥锁
- healthCheckTicker *time.Ticker // 健康检查定时器
- lastPingTime time.Time // 最后心跳时间
- }
- // NewChatClient 构造函数,创建新的ChatClient实例
- func NewChatClient(userID, address string) *ChatClient {
- // 创建可取消的上下文
- ctx, cancel := context.WithCancel(context.Background())
- return &ChatClient{
- userID: userID, // 设置用户ID
- ctx: ctx, // 设置上下文
- cancel: cancel, // 设置取消函数
- serviceAddress: address, // 设置服务端地址
- }
- }
- // connect 连接到gRPC服务器
- func (c *ChatClient) connect(password string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- // 检查现有连接是否可用
- if c.isConnected && c.conn.GetState() == connectivity.Ready {
- return nil
- }
- // 打印连接日志
- log.Println("[连接] 尝试连接服务器...", c.serviceAddress)
- // 创建带超时的上下文
- ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
- defer cancel()
- // 建立gRPC连接
- conn, err := grpc.DialContext(ctx, c.serviceAddress,
- grpc.WithTransportCredentials(insecure.NewCredentials()), // 使用非安全连接
- grpc.WithBlock(), // 阻塞式连接
- grpc.WithPerRPCCredentials(&authCreds{password: password}), // 设置认证凭证
- grpc.WithDefaultCallOptions( // 设置默认调用选项
- grpc.MaxCallRecvMsgSize(20*1024*1024), // 最大接收消息大小
- grpc.MaxCallSendMsgSize(20*1024*1024), // 最大发送消息大小
- ),
- grpc.WithKeepaliveParams(keepalive.ClientParameters{ // 设置keepalive参数
- Time: keepaliveTime,
- Timeout: keepaliveTimeout,
- PermitWithoutStream: true, // 允许无活动流时也发送心跳
- }),
- )
- if err != nil {
- return fmt.Errorf("连接失败: %v", err)
- }
- // 检查连接状态
- state := conn.GetState()
- if state != connectivity.Ready {
- if conn.WaitForStateChange(ctx, connectivity.Connecting) {
- state := conn.GetState()
- if state != connectivity.Ready {
- _ = conn.Close()
- return fmt.Errorf("连接未就绪,状态: %v", state)
- }
- }
- }
- // 创建客户端并测试连接
- client := NewChatServiceClient(conn)
- _, err = client.JoinChat(context.Background(), &JoinRequest{UserId: c.userID, Force: true})
- if err != nil {
- _ = conn.Close()
- return fmt.Errorf("连接测试失败: %v", err)
- }
- // 关闭旧连接(如果存在)
- if c.conn != nil {
- _ = c.conn.Close()
- }
- // 更新连接状态
- c.conn = conn
- c.client = client
- c.isConnected = true
- c.retryCount = 0
- c.lastPingTime = time.Now()
- // 启动健康检查(延迟10秒)
- if c.healthCheckTicker != nil {
- c.healthCheckTicker.Stop()
- c.healthCheckTicker = nil
- }
- go func() {
- time.Sleep(10 * time.Second) // 给连接稳定时间
- c.startHealthCheck()
- }()
- log.Printf("[连接][用户:%s] 服务器连接成功", c.userID)
- return nil
- }
- // disconnect 断开与服务器的连接
- func (c *ChatClient) disconnect() {
- c.mu.Lock()
- defer c.mu.Unlock()
- // 如果已经断开连接,直接返回
- if !c.isConnected {
- return
- }
- // 停止健康检查定时器
- if c.healthCheckTicker != nil {
- c.healthCheckTicker.Stop()
- c.healthCheckTicker = nil
- }
- // 关闭流
- c.closeStream()
- // 关闭连接
- if c.conn != nil {
- if err := c.conn.Close(); err != nil {
- log.Printf("[连接][用户:%s] 关闭连接出错: %v", c.userID, err)
- }
- c.conn = nil
- }
- // 更新连接状态
- c.isConnected = false
- log.Printf("[连接][用户:%s] 已断开连接", c.userID)
- }
- // closeStream 关闭gRPC流
- func (c *ChatClient) closeStream() {
- c.streamMutex.Lock()
- defer c.streamMutex.Unlock()
- // 如果流存在,则关闭
- if c.stream != nil {
- if err := c.stream.CloseSend(); err != nil {
- log.Printf("[流] 关闭流错误: %v", err)
- }
- c.stream = nil
- }
- }
- // reconnect 执行重连逻辑
- func (c *ChatClient) reconnect() {
- const maxRetries = 500 // 最大重试次数
- // 加锁检查重连状态
- c.mu.Lock()
- if c.reconnecting {
- c.mu.Unlock()
- log.Printf("[重连] 已在重连中,跳过")
- return
- }
- c.reconnecting = true
- currentRetry := c.retryCount
- c.mu.Unlock()
- // 确保结束时重置重连状态
- defer func() {
- c.mu.Lock()
- c.reconnecting = false
- c.mu.Unlock()
- log.Printf("[重连] 重连流程结束")
- }()
- // 等待组管理
- c.wg.Add(1)
- defer c.wg.Done()
- // 重连循环
- for currentRetry < maxRetries {
- select {
- case <-c.ctx.Done():
- log.Printf("[重连] 上下文取消,终止重连")
- return
- default:
- // 1. 清理旧连接
- c.disconnect()
- // 2. 尝试新连接
- log.Printf("[重连] 尝试第 %d/%d 次重连", currentRetry+1, maxRetries)
- err := c.connect(config.Cfg.Password)
- if err == nil {
- log.Printf("[重连] 成功建立新连接")
- go c.establishStream()
- return
- }
- // 3. 错误处理
- log.Printf("[重连] 第 %d 次重连失败: %v", currentRetry+1, err)
- if isFatalError(err) {
- log.Printf("[重连] 遇到致命错误,停止重连: %v", err)
- return
- }
- // 4. 退避等待
- backoff := calculateBackoff(currentRetry)
- log.Printf("[重连] 等待 %v 后重试", backoff)
- select {
- case <-time.After(backoff):
- currentRetry++
- case <-c.ctx.Done():
- return
- }
- }
- }
- log.Printf("[重连] 已达到最大重试次数 (%d),停止重连", maxRetries)
- }
- // 辅助函数:判断是否为致命错误(如认证失败、无效地址)
- func isFatalError(err error) bool {
- if err == nil {
- return false
- }
- // 示例:gRPC 的不可恢复错误码
- if status, ok := status.FromError(err); ok {
- switch status.Code() {
- case codes.Unauthenticated, codes.PermissionDenied, codes.NotFound:
- return true
- }
- }
- return false
- }
- // calculateBackoff 计算退避时间(指数退避算法)
- func calculateBackoff(retryCount int) time.Duration {
- base := float64(initialReconnectInterval)
- max := float64(maxReconnectInterval)
- backoff := base * math.Pow(1.5, float64(retryCount))
- return time.Duration(math.Min(backoff, max))
- }
- // establishStream 建立gRPC流
- func (c *ChatClient) establishStream() {
- // 添加等待组计数
- c.wg.Add(1)
- defer c.wg.Done()
- retryDelay := time.Second
- for {
- select {
- case <-c.ctx.Done(): // 检查是否被取消
- return
- default:
- // 检查连接是否就绪
- if !c.isReady() {
- time.Sleep(retryDelay)
- retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
- continue
- }
- // 尝试建立流
- stream, err := func() (ChatService_JoinChatClient, error) {
- c.streamMutex.Lock()
- defer c.streamMutex.Unlock()
- return c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID, Force: true})
- }()
- if err != nil {
- log.Printf("[流] 建立流失败: %v,等待重试...", err)
- time.Sleep(retryDelay)
- retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
- continue
- }
- // 重置重试延迟
- retryDelay = time.Second
- // 处理接收到的消息
- if err := c.receiveMessages(stream); err != nil {
- log.Printf("[流] 接收消息错误: %v", err)
- return
- }
- }
- }
- }
- // receiveMessages 接收并处理消息
- func (c *ChatClient) receiveMessages(stream ChatService_JoinChatClient) error {
- for {
- // 接收消息
- msg, err := stream.Recv()
- if msg == nil {
- continue
- }
- // 处理错误
- if err != nil {
- // 区分错误类型
- st, ok := status.FromError(err)
- if ok {
- switch st.Code() {
- case codes.Canceled:
- log.Println("流正常关闭")
- return nil
- case codes.Unavailable, codes.DeadlineExceeded:
- log.Printf("流异常断开,触发重连: %v", err)
- go c.reconnect() // 触发重连
- return err
- default:
- log.Printf("不可恢复错误: %v", err)
- return err
- }
- }
- // 非gRPC错误(如网络问题)
- log.Printf("[流] 接收错误: %v,触发重连", err)
- go c.reconnect()
- return err
- }
- // 跳过空消息
- if msg == nil || msg.Text == "" || msg.UserId == "" {
- continue
- }
- // 再次检查错误(冗余检查)
- if err != nil {
- if status.Code(err) == codes.Canceled {
- return nil
- }
- return fmt.Errorf("接收消息错误: %v", err)
- }
- // 打印收到的消息
- log.Printf("[接收] 收到消息: %+v", msg)
- // 处理系统消息
- if msg.UserId == "系统" {
- switch msg.Action {
- case "sendTalk":
- go SendTalk(msg.Text)
- case "getContacts":
- go GetContacts()
- case "reject":
- go Reject(msg.Text)
- }
- if msg.Text == "欢迎加入聊天室" {
- go GetContacts()
- }
- }
- }
- }
- // SendMessage 发送消息
- func (c *ChatClient) SendMessage(text, action string) error {
- c.mu.RLock()
- defer c.mu.RUnlock()
- // 检查连接是否就绪
- if !c.isReady() {
- go c.reconnect() // 触发重连
- return fmt.Errorf("未连接服务器,正在尝试重连...")
- }
- // 构造消息
- msg := &Message{
- UserId: c.userID,
- Text: text,
- Action: action,
- }
- log.Printf("[发送] 发送消息: %+v", msg)
- // 创建带超时的上下文
- ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
- defer cancel()
- // 发送消息
- aaa, err := c.client.SendMessage(ctx, msg)
- log.Println(aaa)
- if err != nil {
- // 处理不同类型的错误
- st, ok := status.FromError(err)
- if ok {
- switch st.Code() {
- case codes.Unavailable, codes.DeadlineExceeded:
- go c.reconnect() // 网络问题触发重连
- case codes.Unauthenticated, codes.PermissionDenied:
- // 认证错误不重连
- }
- }
- return fmt.Errorf("发送失败: %v", err)
- }
- return nil
- }
- // ConnectGRPC 启动gRPC客户端
- func ConnectGRPC(userId, address string) {
- log.Println("[主程序] 启动GRPC连接")
- // 创建新客户端
- client = NewChatClient(userId, address)
- defer client.Shutdown() // 确保退出时关闭
- // 启动连接监控
- go client.startConnectionMonitor()
- // 初始连接
- if err := client.connect(config.Cfg.Password); err != nil {
- log.Printf("[主程序] 初始连接失败: %v", err)
- go client.reconnect()
- } else {
- go client.establishStream()
- }
- // 等待退出信号
- quit := make(chan os.Signal, 1)
- signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
- <-quit
- }
- // isReady 检查连接是否就绪
- func (c *ChatClient) isReady() bool {
- c.mu.RLock()
- defer c.mu.RUnlock()
- // 基本检查
- if !c.isConnected || c.conn == nil {
- return false
- }
- // 检查连接状态
- state := c.conn.GetState()
- if state != connectivity.Ready {
- return false
- }
- // 检查心跳时间
- log.Println(time.Since(c.lastPingTime), 3*keepaliveTime)
- if time.Since(c.lastPingTime) > 3*keepaliveTime {
- return false
- }
- return true
- }
- // startHealthCheck 启动健康检查
- func (c *ChatClient) startHealthCheck() {
- c.healthCheckTicker = time.NewTicker(healthCheckInterval)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
- for {
- select {
- case <-c.healthCheckTicker.C: // 定时触发
- if !c.isReady() {
- log.Printf("连接不可用,触发重连")
- go c.reconnect()
- continue
- }
- // 执行健康检查
- ctx, cancel := context.WithTimeout(c.ctx, 6*time.Second)
- _, err := c.client.Ping(ctx, &PingRequest{UserId: c.userID})
- cancel()
- if err != nil {
- log.Printf("健康检查失败: %v", err)
- c.disconnect()
- go c.reconnect()
- } else {
- c.mu.Lock()
- c.lastPingTime = time.Now() // 更新最后心跳时间
- c.mu.Unlock()
- }
- case <-c.ctx.Done(): // 上下文取消
- return
- }
- }
- }()
- }
- // checkHealth 执行健康检查
- func (c *ChatClient) checkHealth() error {
- ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
- defer cancel()
- // 使用JoinChat方法检查健康状态
- _, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID, Force: false})
- if err != nil {
- return fmt.Errorf("健康检查失败: %w", err)
- }
- return nil
- }
- // Shutdown 关闭客户端
- func (c *ChatClient) Shutdown() {
- log.Println("客户端服务关闭")
- // 1. 取消上下文
- c.cancel()
- // 2. 断开连接
- c.disconnect()
- // 3. 等待所有goroutine结束
- c.wg.Wait()
- }
- // authCreds 实现gRPC认证接口
- type authCreds struct {
- password string
- }
- // GetRequestMetadata 获取认证元数据
- func (c *authCreds) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
- return map[string]string{
- "password": c.password,
- }, nil
- }
- // RequireTransportSecurity 是否要求传输安全
- func (c *authCreds) RequireTransportSecurity() bool {
- return false // 不要求TLS
- }
- // startConnectionMonitor 启动连接监控
- func (c *ChatClient) startConnectionMonitor() {
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
- ticker := time.NewTicker(1 * time.Minute) // 每5分钟检查一次
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C: // 定时触发
- c.mu.RLock()
- conn := c.conn
- c.mu.RUnlock()
- if conn == nil {
- log.Printf("[监控] 连接不存在,触发重连")
- go c.reconnect()
- continue
- }
- // 检查连接状态
- state := conn.GetState()
- log.Printf("[监控] 当前连接状态: %v", state)
- // 判断是否需要重连
- if state == connectivity.TransientFailure ||
- state == connectivity.Shutdown ||
- (state == connectivity.Ready && time.Since(c.lastPingTime) > 3*keepaliveTime) {
- log.Printf("[监控] 连接异常,触发重连")
- go c.reconnect()
- }
- case <-c.ctx.Done(): // 上下文取消
- log.Printf("[监控] 监控停止")
- return
- }
- }
- }()
- }
|