package service import ( "context" "fmt" "google.golang.org/grpc/codes" "log" "math" "os" "os/signal" "sync" "syscall" "time" . "client/chat" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" ) const ( initialReconnectInterval = 1 * time.Second keepaliveTime = 60 * time.Second keepaliveTimeout = 20 * time.Second maxRetryCount = 60 connectionTimeout = 3 * time.Second maxReconnectInterval = 60 * time.Second healthCheckInterval = 30 * time.Second ) var client = &ChatClient{} type ChatClient struct { conn *grpc.ClientConn client ChatServiceClient ctx context.Context cancel context.CancelFunc userID string mu sync.RWMutex retryCount int isConnected bool wg sync.WaitGroup reconnecting bool serviceAddress string stream ChatService_JoinChatClient streamMutex sync.Mutex healthCheckTicker *time.Ticker lastPingTime time.Time } func NewChatClient(userID, address string) *ChatClient { ctx, cancel := context.WithCancel(context.Background()) return &ChatClient{ userID: userID, ctx: ctx, cancel: cancel, serviceAddress: address, } } // 连接服务器 func (c *ChatClient) connect() error { c.mu.Lock() defer c.mu.Unlock() if c.isConnected && c.conn.GetState() == connectivity.Ready { return nil } log.Println("[连接] 尝试连接服务器...", c.serviceAddress) ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout) defer cancel() conn, err := grpc.DialContext(ctx, c.serviceAddress, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithDefaultCallOptions( grpc.MaxCallRecvMsgSize(20*1024*1024), grpc.MaxCallSendMsgSize(20*1024*1024), ), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: keepaliveTime, Timeout: keepaliveTimeout, PermitWithoutStream: true, }), ) if err != nil { return fmt.Errorf("连接失败: %v", err) } // 检查连接状态 state := conn.GetState() if state != connectivity.Ready { if conn.WaitForStateChange(ctx, connectivity.Connecting) { state := conn.GetState() if state != connectivity.Ready { _ = conn.Close() return fmt.Errorf("连接未就绪,状态: %v", state) } } } client := NewChatServiceClient(conn) _, err = client.JoinChat(context.Background(), &JoinRequest{UserId: c.userID}) if err != nil { _ = conn.Close() return fmt.Errorf("连接测试失败: %v", err) } // 关闭旧连接 if c.conn != nil { _ = c.conn.Close() } c.conn = conn c.client = client c.isConnected = true c.retryCount = 0 c.lastPingTime = time.Now() // 启动健康检查 if c.healthCheckTicker != nil { c.healthCheckTicker.Stop() c.healthCheckTicker = nil } go func() { time.Sleep(10 * time.Second) // 给连接稳定时间 c.startHealthCheck() }() log.Printf("[连接][用户:%s] 服务器连接成功", c.userID) return nil } // 断开连接 func (c *ChatClient) disconnect() { c.mu.Lock() defer c.mu.Unlock() if !c.isConnected { return } // 停止健康检查 if c.healthCheckTicker != nil { c.healthCheckTicker.Stop() c.healthCheckTicker = nil } c.closeStream() if c.conn != nil { if err := c.conn.Close(); err != nil { log.Printf("[连接][用户:%s] 关闭连接出错: %v", c.userID, err) } c.conn = nil } c.isConnected = false log.Printf("[连接][用户:%s] 已断开连接", c.userID) } // 关闭流 func (c *ChatClient) closeStream() { c.streamMutex.Lock() defer c.streamMutex.Unlock() if c.stream != nil { if err := c.stream.CloseSend(); err != nil { log.Printf("[流] 关闭流错误: %v", err) } c.stream = nil } } // 重连逻辑 func (c *ChatClient) reconnect() { c.mu.Lock() if c.reconnecting { c.mu.Unlock() return } c.reconnecting = true currentRetry := c.retryCount c.mu.Unlock() defer func() { c.mu.Lock() c.reconnecting = false c.mu.Unlock() }() c.wg.Add(1) defer c.wg.Done() log.Printf("[重连] 开始重连流程,当前重试计数: %d", currentRetry) for { select { case <-c.ctx.Done(): log.Println("[重连] 上下文取消,停止重连") return default: c.mu.Lock() if c.retryCount >= maxRetryCount { log.Printf("[重连] 达到最大重试次数(%d),停止重连", maxRetryCount) c.mu.Unlock() return } c.mu.Unlock() log.Printf("[重连] 尝试第%d次连接...", currentRetry+1) if err := c.connect(); err != nil { log.Printf("[重连][用户:%s] 连接失败: %v", c.userID, err) c.mu.Lock() c.retryCount++ currentRetry = c.retryCount c.mu.Unlock() // 指数退避算法 backoff := initialReconnectInterval if currentRetry > 0 { backoff = time.Duration(math.Min( float64(initialReconnectInterval)*math.Pow(1.5, float64(currentRetry)), float64(maxReconnectInterval), )) } select { case <-time.After(backoff): continue case <-c.ctx.Done(): log.Printf("[重连][用户:%s] 等待期间上下文取消", c.userID) return } } else { log.Printf("[重连][用户:%s] 连接成功!", c.userID) go c.establishStream() return } } } } // 建立流 func (c *ChatClient) establishStream() { c.wg.Add(1) defer c.wg.Done() retryDelay := time.Second for { select { case <-c.ctx.Done(): return default: // 添加连接状态检查 if !c.isReady() { time.Sleep(retryDelay) retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval))) continue } // 尝试建立流 stream, err := func() (ChatService_JoinChatClient, error) { c.streamMutex.Lock() defer c.streamMutex.Unlock() return c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID}) }() if err != nil { log.Printf("[流] 建立流失败: %v,等待重试...", err) time.Sleep(retryDelay) retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval))) continue } // 重置重试延迟 retryDelay = time.Second // 处理消息 if err := c.receiveMessages(stream); err != nil { log.Printf("[流] 接收消息错误: %v", err) return } } } } // 接收消息 func (c *ChatClient) receiveMessages(stream ChatService_JoinChatClient) error { for { msg, err := stream.Recv() if err != nil { if status.Code(err) == codes.Canceled { return nil // 正常关闭 } return fmt.Errorf("接收消息错误: %v", err) } log.Printf("[接收] 收到消息: %+v", msg) if msg.UserId == "系统" { switch msg.Action { case "sendTalk": go SendTalk(msg.Text) case "getContacts": go GetContacts() case "reject": go Reject(msg.Text) default: log.Printf("[系统通知]: %s", msg.Text) } if msg.Text == "欢迎加入聊天室" { go GetContacts() } } } } // 发送消息 func (c *ChatClient) SendMessage(text, action string) error { c.mu.RLock() defer c.mu.RUnlock() if !c.isReady() { return fmt.Errorf("未连接服务器") } msg := &Message{ UserId: c.userID, Text: text, Action: action, } log.Printf("[发送] 发送消息: %+v", msg) ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) defer cancel() _, err := c.client.SendMessage(ctx, msg) if err != nil { log.Printf("[发送] 发送失败: %v", err) c.disconnect() go c.reconnect() return err } return nil } // 启动客户端 func ConnectGRPC(userId, address string) { log.Println("[主程序] 启动GRPC连接") client = NewChatClient(userId, address) defer client.Shutdown() if err := client.connect(); err != nil { log.Printf("[主程序] 初始连接失败: %v", err) go client.reconnect() } else { go client.establishStream() } // 保持主线程运行 quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) <-quit } func (c *ChatClient) isReady() bool { c.mu.RLock() defer c.mu.RUnlock() if !c.isConnected || c.conn == nil { return false } state := c.conn.GetState() // 只有当状态是Ready时才返回true return state == connectivity.Ready } // startHealthCheck 启动健康检查 func (c *ChatClient) startHealthCheck() { if c.healthCheckTicker != nil { c.healthCheckTicker.Stop() } c.healthCheckTicker = time.NewTicker(healthCheckInterval) c.wg.Add(1) go func() { defer c.wg.Done() defer c.healthCheckTicker.Stop() for { select { case <-c.healthCheckTicker.C: if !c.isReady() { log.Printf("[健康检查][用户:%s] 连接不可用,触发重连", c.userID) go c.reconnect() continue } // 执行Ping检查 ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) _, err := c.client.Ping(ctx, &PingRequest{}) cancel() if err != nil { log.Printf("[健康检查][用户:%s] Ping失败: %v", c.userID, err) c.disconnect() go c.reconnect() } else { c.mu.Lock() c.lastPingTime = time.Now() c.mu.Unlock() } case <-c.ctx.Done(): log.Printf("[健康检查][用户:%s] 停止健康检查", c.userID) return } } }() } // 修改健康检查实现 func (c *ChatClient) checkHealth() error { ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second) defer cancel() // 优先使用JoinChat作为健康检查 _, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID}) if err != nil { st, ok := status.FromError(err) if ok && st.Code() == codes.Unimplemented { // 如果JoinChat未实现,尝试其他方法 return c.fallbackHealthCheck() } return err } return nil } func (c *ChatClient) fallbackHealthCheck() error { // 实现其他健康检查方式 return nil } func (c *ChatClient) Shutdown() { // 1. 取消上下文 c.cancel() // 2. 断开连接 c.disconnect() // 3. 等待所有goroutine结束 c.wg.Wait() } func (c *ChatClient) log() *log.Logger { return log.New(os.Stdout, fmt.Sprintf("[用户:%s] ", c.userID), log.LstdFlags) }