|
- package service
- import (
- "context"
- "fmt"
- "google.golang.org/grpc/codes"
- "log"
- "math"
- "os"
- "os/signal"
- "sync"
- "syscall"
- "time"
- . "client/chat"
- "google.golang.org/grpc"
- "google.golang.org/grpc/connectivity"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/keepalive"
- "google.golang.org/grpc/status"
- )
- const (
- initialReconnectInterval = 1 * time.Second
- keepaliveTime = 60 * time.Second
- keepaliveTimeout = 20 * time.Second
- maxRetryCount = 60
- connectionTimeout = 3 * time.Second
- maxReconnectInterval = 60 * time.Second
- healthCheckInterval = 30 * time.Second
- )
- var client = &ChatClient{}
- type ChatClient struct {
- conn *grpc.ClientConn
- client ChatServiceClient
- ctx context.Context
- cancel context.CancelFunc
- userID string
- mu sync.RWMutex
- retryCount int
- isConnected bool
- wg sync.WaitGroup
- reconnecting bool
- serviceAddress string
- stream ChatService_JoinChatClient
- streamMutex sync.Mutex
- healthCheckTicker *time.Ticker
- lastPingTime time.Time
- }
- func NewChatClient(userID, address string) *ChatClient {
- ctx, cancel := context.WithCancel(context.Background())
- return &ChatClient{
- userID: userID,
- ctx: ctx,
- cancel: cancel,
- serviceAddress: address,
- }
- }
- // 连接服务器
- func (c *ChatClient) connect() error {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.isConnected && c.conn.GetState() == connectivity.Ready {
- return nil
- }
- log.Println("[连接] 尝试连接服务器...", c.serviceAddress)
- ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
- defer cancel()
- conn, err := grpc.DialContext(ctx, c.serviceAddress,
- grpc.WithTransportCredentials(insecure.NewCredentials()),
- grpc.WithBlock(),
- grpc.WithDefaultCallOptions(
- grpc.MaxCallRecvMsgSize(20*1024*1024),
- grpc.MaxCallSendMsgSize(20*1024*1024),
- ),
- grpc.WithKeepaliveParams(keepalive.ClientParameters{
- Time: keepaliveTime,
- Timeout: keepaliveTimeout,
- PermitWithoutStream: true,
- }),
- )
- if err != nil {
- return fmt.Errorf("连接失败: %v", err)
- }
- // 检查连接状态
- state := conn.GetState()
- if state != connectivity.Ready {
- if conn.WaitForStateChange(ctx, connectivity.Connecting) {
- state := conn.GetState()
- if state != connectivity.Ready {
- _ = conn.Close()
- return fmt.Errorf("连接未就绪,状态: %v", state)
- }
- }
- }
- client := NewChatServiceClient(conn)
- _, err = client.JoinChat(context.Background(), &JoinRequest{UserId: c.userID})
- if err != nil {
- _ = conn.Close()
- return fmt.Errorf("连接测试失败: %v", err)
- }
- // 关闭旧连接
- if c.conn != nil {
- _ = c.conn.Close()
- }
- c.conn = conn
- c.client = client
- c.isConnected = true
- c.retryCount = 0
- c.lastPingTime = time.Now()
- // 启动健康检查
- if c.healthCheckTicker != nil {
- c.healthCheckTicker.Stop()
- c.healthCheckTicker = nil
- }
- go func() {
- time.Sleep(10 * time.Second) // 给连接稳定时间
- c.startHealthCheck()
- }()
- log.Printf("[连接][用户:%s] 服务器连接成功", c.userID)
- return nil
- }
- // 断开连接
- func (c *ChatClient) disconnect() {
- c.mu.Lock()
- defer c.mu.Unlock()
- if !c.isConnected {
- return
- }
- // 停止健康检查
- if c.healthCheckTicker != nil {
- c.healthCheckTicker.Stop()
- c.healthCheckTicker = nil
- }
- c.closeStream()
- if c.conn != nil {
- if err := c.conn.Close(); err != nil {
- log.Printf("[连接][用户:%s] 关闭连接出错: %v", c.userID, err)
- }
- c.conn = nil
- }
- c.isConnected = false
- log.Printf("[连接][用户:%s] 已断开连接", c.userID)
- }
- // 关闭流
- func (c *ChatClient) closeStream() {
- c.streamMutex.Lock()
- defer c.streamMutex.Unlock()
- if c.stream != nil {
- if err := c.stream.CloseSend(); err != nil {
- log.Printf("[流] 关闭流错误: %v", err)
- }
- c.stream = nil
- }
- }
- // 重连逻辑
- func (c *ChatClient) reconnect() {
- c.mu.Lock()
- if c.reconnecting {
- c.mu.Unlock()
- return
- }
- c.reconnecting = true
- currentRetry := c.retryCount
- c.mu.Unlock()
- defer func() {
- c.mu.Lock()
- c.reconnecting = false
- c.mu.Unlock()
- }()
- c.wg.Add(1)
- defer c.wg.Done()
- log.Printf("[重连] 开始重连流程,当前重试计数: %d", currentRetry)
- for {
- select {
- case <-c.ctx.Done():
- log.Println("[重连] 上下文取消,停止重连")
- return
- default:
- c.mu.Lock()
- if c.retryCount >= maxRetryCount {
- log.Printf("[重连] 达到最大重试次数(%d),停止重连", maxRetryCount)
- c.mu.Unlock()
- return
- }
- c.mu.Unlock()
- log.Printf("[重连] 尝试第%d次连接...", currentRetry+1)
- if err := c.connect(); err != nil {
- log.Printf("[重连][用户:%s] 连接失败: %v", c.userID, err)
- c.mu.Lock()
- c.retryCount++
- currentRetry = c.retryCount
- c.mu.Unlock()
- // 指数退避算法
- backoff := initialReconnectInterval
- if currentRetry > 0 {
- backoff = time.Duration(math.Min(
- float64(initialReconnectInterval)*math.Pow(1.5, float64(currentRetry)),
- float64(maxReconnectInterval),
- ))
- }
- select {
- case <-time.After(backoff):
- continue
- case <-c.ctx.Done():
- log.Printf("[重连][用户:%s] 等待期间上下文取消", c.userID)
- return
- }
- } else {
- log.Printf("[重连][用户:%s] 连接成功!", c.userID)
- go c.establishStream()
- return
- }
- }
- }
- }
- // 建立流
- func (c *ChatClient) establishStream() {
- c.wg.Add(1)
- defer c.wg.Done()
- retryDelay := time.Second
- for {
- select {
- case <-c.ctx.Done():
- return
- default:
- // 添加连接状态检查
- if !c.isReady() {
- time.Sleep(retryDelay)
- retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
- continue
- }
- // 尝试建立流
- stream, err := func() (ChatService_JoinChatClient, error) {
- c.streamMutex.Lock()
- defer c.streamMutex.Unlock()
- return c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID})
- }()
- if err != nil {
- log.Printf("[流] 建立流失败: %v,等待重试...", err)
- time.Sleep(retryDelay)
- retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
- continue
- }
- // 重置重试延迟
- retryDelay = time.Second
- // 处理消息
- if err := c.receiveMessages(stream); err != nil {
- log.Printf("[流] 接收消息错误: %v", err)
- return
- }
- }
- }
- }
- // 接收消息
- func (c *ChatClient) receiveMessages(stream ChatService_JoinChatClient) error {
- for {
- msg, err := stream.Recv()
- if err != nil {
- if status.Code(err) == codes.Canceled {
- return nil // 正常关闭
- }
- return fmt.Errorf("接收消息错误: %v", err)
- }
- log.Printf("[接收] 收到消息: %+v", msg)
- if msg.UserId == "系统" {
- switch msg.Action {
- case "sendTalk":
- go SendTalk(msg.Text)
- case "getContacts":
- go GetContacts()
- case "reject":
- go Reject(msg.Text)
- default:
- log.Printf("[系统通知]: %s", msg.Text)
- }
- if msg.Text == "欢迎加入聊天室" {
- go GetContacts()
- }
- }
- }
- }
- // 发送消息
- func (c *ChatClient) SendMessage(text, action string) error {
- c.mu.RLock()
- defer c.mu.RUnlock()
- if !c.isReady() {
- return fmt.Errorf("未连接服务器")
- }
- msg := &Message{
- UserId: c.userID,
- Text: text,
- Action: action,
- }
- log.Printf("[发送] 发送消息: %+v", msg)
- ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
- defer cancel()
- _, err := c.client.SendMessage(ctx, msg)
- if err != nil {
- log.Printf("[发送] 发送失败: %v", err)
- c.disconnect()
- go c.reconnect()
- return err
- }
- return nil
- }
- // 启动客户端
- func ConnectGRPC(userId, address string) {
- log.Println("[主程序] 启动GRPC连接")
- client = NewChatClient(userId, address)
- defer client.Shutdown()
- if err := client.connect(); err != nil {
- log.Printf("[主程序] 初始连接失败: %v", err)
- go client.reconnect()
- } else {
- go client.establishStream()
- }
- // 保持主线程运行
- quit := make(chan os.Signal, 1)
- signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
- <-quit
- }
- func (c *ChatClient) isReady() bool {
- c.mu.RLock()
- defer c.mu.RUnlock()
- if !c.isConnected || c.conn == nil {
- return false
- }
- state := c.conn.GetState()
- // 只有当状态是Ready时才返回true
- return state == connectivity.Ready
- }
- // startHealthCheck 启动健康检查
- func (c *ChatClient) startHealthCheck() {
- if c.healthCheckTicker != nil {
- c.healthCheckTicker.Stop()
- }
- c.healthCheckTicker = time.NewTicker(healthCheckInterval)
- c.wg.Add(1)
- go func() {
- defer c.wg.Done()
- defer c.healthCheckTicker.Stop()
- for {
- select {
- case <-c.healthCheckTicker.C:
- if !c.isReady() {
- log.Printf("[健康检查][用户:%s] 连接不可用,触发重连", c.userID)
- go c.reconnect()
- continue
- }
- // 执行Ping检查
- ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
- _, err := c.client.Ping(ctx, &PingRequest{})
- cancel()
- if err != nil {
- log.Printf("[健康检查][用户:%s] Ping失败: %v", c.userID, err)
- c.disconnect()
- go c.reconnect()
- } else {
- c.mu.Lock()
- c.lastPingTime = time.Now()
- c.mu.Unlock()
- }
- case <-c.ctx.Done():
- log.Printf("[健康检查][用户:%s] 停止健康检查", c.userID)
- return
- }
- }
- }()
- }
- // 修改健康检查实现
- func (c *ChatClient) checkHealth() error {
- ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
- defer cancel()
- // 优先使用JoinChat作为健康检查
- _, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID})
- if err != nil {
- st, ok := status.FromError(err)
- if ok && st.Code() == codes.Unimplemented {
- // 如果JoinChat未实现,尝试其他方法
- return c.fallbackHealthCheck()
- }
- return err
- }
- return nil
- }
- func (c *ChatClient) fallbackHealthCheck() error {
- // 实现其他健康检查方式
- return nil
- }
- func (c *ChatClient) Shutdown() {
- // 1. 取消上下文
- c.cancel()
- // 2. 断开连接
- c.disconnect()
- // 3. 等待所有goroutine结束
- c.wg.Wait()
- }
- func (c *ChatClient) log() *log.Logger {
- return log.New(os.Stdout, fmt.Sprintf("[用户:%s] ", c.userID), log.LstdFlags)
- }
|