package service import ( "context" "fmt" "log" "sync" "time" . "client/chat" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" ) const ( reconnectInterval = 60 * time.Second timeInt64 = 60 * time.Second TimeoutInt64 = 20 * time.Second maxRetryCount = 60 ) type Config struct { ServerAddress string ReconnectInterval time.Duration MaxRetryCount int } var client = &ChatClient{} type ChatClient struct { conn *grpc.ClientConn client ChatServiceClient ctx context.Context cancel context.CancelFunc userID string mu sync.Mutex retryCount int isConnected bool wg sync.WaitGroup reconnecting bool ServiceAddress string } func NewChatClient(userID, address string) *ChatClient { ctx, cancel := context.WithCancel(context.Background()) return &ChatClient{ userID: userID, ctx: ctx, cancel: cancel, ServiceAddress: address, } } // grpc连接 func (c *ChatClient) connect() error { c.mu.Lock() defer c.mu.Unlock() if c.isConnected { return nil } log.Println("[连接] 尝试连接服务器...") ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() conn, err := grpc.DialContext(ctx, "localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: timeInt64, Timeout: TimeoutInt64, PermitWithoutStream: true, })) if err != nil { return fmt.Errorf("连接失败: %v", err) } // 测试连接 testClient := NewChatServiceClient(conn) _, err = testClient.JoinChat(context.Background(), &JoinRequest{UserId: c.userID}) if err != nil { _ = conn.Close() return fmt.Errorf("连接测试失败: %v", err) } c.conn = conn c.client = testClient c.isConnected = true c.retryCount = 0 log.Println("[连接] 服务器连接成功") return nil } func (c *ChatClient) disconnect() { c.mu.Lock() defer c.mu.Unlock() if !c.isConnected { return } if c.conn != nil { if err := c.conn.Close(); err != nil { log.Println("[连接] 关闭连接出错: %v", err) } c.conn = nil } c.isConnected = false log.Println("[连接] 已断开连接") } func (c *ChatClient) reconnect() { // 初始加锁检查重连状态 c.mu.Lock() if c.reconnecting { c.mu.Unlock() return } c.reconnecting = true currentRetry := c.retryCount c.mu.Unlock() // 确保最终重置重连状态 defer func() { c.mu.Lock() c.reconnecting = false c.mu.Unlock() }() // 等待组管理 c.wg.Add(1) defer c.wg.Done() log.Println("[重连] 开始重连流程,当前重试计数: %d", currentRetry) for { select { case <-c.ctx.Done(): log.Println("[重连] 上下文取消,停止重连") return default: // 检查最大重试次数 c.mu.Lock() if c.retryCount >= maxRetryCount { log.Println("[重连] 达到最大重试次数(%d),停止重连", maxRetryCount) c.mu.Unlock() return } c.mu.Unlock() // 尝试连接 log.Println("[重连] 尝试第%d次连接...", currentRetry+1) if err := c.connect(); err != nil { log.Println("[重连] 连接失败: %v", err) // 更新重试计数 c.mu.Lock() c.retryCount++ currentRetry = c.retryCount c.mu.Unlock() log.Println("[重连] 等待%d秒后重试...", int(reconnectInterval.Seconds())) select { case <-time.After(reconnectInterval): continue case <-c.ctx.Done(): log.Println("[重连] 等待期间上下文取消") return } } else { log.Println("[重连] 连接成功!") go c.receiveMessages() return } } } } // 接收信息处理 // 修改后的 receiveMessages 方法 func (c *ChatClient) receiveMessages() { c.wg.Add(1) defer c.wg.Done() for { select { case <-c.ctx.Done(): return default: c.mu.Lock() connected := c.isConnected c.mu.Unlock() if !connected { time.Sleep(1 * time.Second) continue } // 加锁保护整个连接过程 c.mu.Lock() stream, err := c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID}) c.mu.Unlock() if err != nil { log.Println(fmt.Sprintf("[接收] 加入聊天室失败: %v", err)) c.disconnect() go c.reconnect() return } // 消息接收循环 for { msg, err := stream.Recv() if err != nil { log.Println(fmt.Sprintf("[接收] 接收消息错误: %v\n", err)) c.disconnect() go c.reconnect() return } // ... 处理消息逻辑 ... log.Println("[接收] 收到消息: ", msg) if msg.UserId == "系统" { switch msg.Action { case "sendTalk": go SendTalk(msg.Text) case "getContacts": go GetContacts() case "reject": go Reject(msg.Text) default: //后期删除掉 if msg.Text == "欢迎加入聊天室!" { log.Println("发送固定信息", client.SendMessage("", "sendTalk")) } log.Println(fmt.Sprintf("[系统通知]: %s ", msg.Text)) } } } } } } // SendMessage 发送单条消息 func (c *ChatClient) SendMessage(text string, action string) error { c.mu.Lock() defer c.mu.Unlock() if !c.isConnected { return fmt.Errorf("未连接服务器") } msg := &Message{ UserId: c.userID, Text: text, Action: action, } log.Println(fmt.Sprintf("[发送] 发送消息 : %+v\n", msg)) _, err := c.client.SendMessage(c.ctx, msg) if err != nil { log.Println(fmt.Sprintf("[发送] 发送失败: %v", err)) c.disconnect() go c.reconnect() return err } return nil } func ConnectGRPC(userId string, address string) { log.Println("[主程序] 启动GRPC连接") client = NewChatClient(userId, address) defer func() { client.cancel() client.disconnect() client.wg.Wait() log.Println("[主程序] 客户端安全退出") }() // 初始连接 if err := client.connect(); err != nil { log.Println(fmt.Sprintf("[主程序] 初始连接失败: %v", err)) go client.reconnect() } else { go client.receiveMessages() } select {} }