123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- package service
- import (
- "context"
- "fmt"
- "log"
- "sync"
- "time"
- . "client/chat"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
- "google.golang.org/grpc/keepalive"
- )
- const (
- reconnectInterval = 60 * time.Second
- timeInt64 = 60 * time.Second
- TimeoutInt64 = 20 * time.Second
- maxRetryCount = 60
- )
- type Config struct {
- ServerAddress string
- ReconnectInterval time.Duration
- MaxRetryCount int
- }
- var client = &ChatClient{}
- type ChatClient struct {
- conn *grpc.ClientConn
- client ChatServiceClient
- ctx context.Context
- cancel context.CancelFunc
- userID string
- mu sync.Mutex
- retryCount int
- isConnected bool
- wg sync.WaitGroup
- reconnecting bool
- ServiceAddress string
- }
- func NewChatClient(userID, address string) *ChatClient {
- ctx, cancel := context.WithCancel(context.Background())
- return &ChatClient{
- userID: userID,
- ctx: ctx,
- cancel: cancel,
- ServiceAddress: address,
- }
- }
- // grpc连接
- func (c *ChatClient) connect() error {
- c.mu.Lock()
- defer c.mu.Unlock()
- if c.isConnected {
- return nil
- }
- log.Println("[连接] 尝试连接服务器...")
- ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
- defer cancel()
- conn, err := grpc.DialContext(ctx, "localhost:50051",
- grpc.WithTransportCredentials(insecure.NewCredentials()),
- grpc.WithBlock(),
- grpc.WithKeepaliveParams(keepalive.ClientParameters{
- Time: timeInt64,
- Timeout: TimeoutInt64,
- PermitWithoutStream: true,
- }))
- if err != nil {
- return fmt.Errorf("连接失败: %v", err)
- }
- // 测试连接
- testClient := NewChatServiceClient(conn)
- _, err = testClient.JoinChat(context.Background(), &JoinRequest{UserId: c.userID})
- if err != nil {
- _ = conn.Close()
- return fmt.Errorf("连接测试失败: %v", err)
- }
- c.conn = conn
- c.client = testClient
- c.isConnected = true
- c.retryCount = 0
- log.Println("[连接] 服务器连接成功")
- return nil
- }
- func (c *ChatClient) disconnect() {
- c.mu.Lock()
- defer c.mu.Unlock()
- if !c.isConnected {
- return
- }
- if c.conn != nil {
- if err := c.conn.Close(); err != nil {
- log.Println("[连接] 关闭连接出错: %v", err)
- }
- c.conn = nil
- }
- c.isConnected = false
- log.Println("[连接] 已断开连接")
- }
- func (c *ChatClient) reconnect() {
- // 初始加锁检查重连状态
- c.mu.Lock()
- if c.reconnecting {
- c.mu.Unlock()
- return
- }
- c.reconnecting = true
- currentRetry := c.retryCount
- c.mu.Unlock()
- // 确保最终重置重连状态
- defer func() {
- c.mu.Lock()
- c.reconnecting = false
- c.mu.Unlock()
- }()
- // 等待组管理
- c.wg.Add(1)
- defer c.wg.Done()
- log.Println("[重连] 开始重连流程,当前重试计数: %d", currentRetry)
- for {
- select {
- case <-c.ctx.Done():
- log.Println("[重连] 上下文取消,停止重连")
- return
- default:
- // 检查最大重试次数
- c.mu.Lock()
- if c.retryCount >= maxRetryCount {
- log.Println("[重连] 达到最大重试次数(%d),停止重连", maxRetryCount)
- c.mu.Unlock()
- return
- }
- c.mu.Unlock()
- // 尝试连接
- log.Println("[重连] 尝试第%d次连接...", currentRetry+1)
- if err := c.connect(); err != nil {
- log.Println("[重连] 连接失败: %v", err)
- // 更新重试计数
- c.mu.Lock()
- c.retryCount++
- currentRetry = c.retryCount
- c.mu.Unlock()
- log.Println("[重连] 等待%d秒后重试...", int(reconnectInterval.Seconds()))
- select {
- case <-time.After(reconnectInterval):
- continue
- case <-c.ctx.Done():
- log.Println("[重连] 等待期间上下文取消")
- return
- }
- } else {
- log.Println("[重连] 连接成功!")
- go c.receiveMessages()
- return
- }
- }
- }
- }
- // 接收信息处理
- // 修改后的 receiveMessages 方法
- func (c *ChatClient) receiveMessages() {
- c.wg.Add(1)
- defer c.wg.Done()
- for {
- select {
- case <-c.ctx.Done():
- return
- default:
- c.mu.Lock()
- connected := c.isConnected
- c.mu.Unlock()
- if !connected {
- time.Sleep(1 * time.Second)
- continue
- }
- // 加锁保护整个连接过程
- c.mu.Lock()
- stream, err := c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID})
- c.mu.Unlock()
- if err != nil {
- log.Println(fmt.Sprintf("[接收] 加入聊天室失败: %v", err))
- c.disconnect()
- go c.reconnect()
- return
- }
- // 消息接收循环
- for {
- msg, err := stream.Recv()
- if err != nil {
- log.Println(fmt.Sprintf("[接收] 接收消息错误: %v\n", err))
- c.disconnect()
- go c.reconnect()
- return
- }
- // ... 处理消息逻辑 ...
- log.Println("[接收] 收到消息: ", msg)
- if msg.UserId == "系统" {
- switch msg.Action {
- case "sendTalk":
- go SendTalk(msg.Text)
- case "getContacts":
- go GetContacts()
- case "reject":
- go Reject(msg.Text)
- default:
- //后期删除掉
- if msg.Text == "欢迎加入聊天室!" {
- log.Println("发送固定信息", client.SendMessage("", "sendTalk"))
- }
- log.Println(fmt.Sprintf("[系统通知]: %s ", msg.Text))
- }
- }
- }
- }
- }
- }
- // SendMessage 发送单条消息
- func (c *ChatClient) SendMessage(text string, action string) error {
- c.mu.Lock()
- defer c.mu.Unlock()
- if !c.isConnected {
- return fmt.Errorf("未连接服务器")
- }
- msg := &Message{
- UserId: c.userID,
- Text: text,
- Action: action,
- }
- log.Println(fmt.Sprintf("[发送] 发送消息 : %+v\n", msg))
- _, err := c.client.SendMessage(c.ctx, msg)
- if err != nil {
- log.Println(fmt.Sprintf("[发送] 发送失败: %v", err))
- c.disconnect()
- go c.reconnect()
- return err
- }
- return nil
- }
- func ConnectGRPC(userId string, address string) {
- log.Println("[主程序] 启动GRPC连接")
- client = NewChatClient(userId, address)
- defer func() {
- client.cancel()
- client.disconnect()
- client.wg.Wait()
- log.Println("[主程序] 客户端安全退出")
- }()
- // 初始连接
- if err := client.connect(); err != nil {
- log.Println(fmt.Sprintf("[主程序] 初始连接失败: %v", err))
- go client.reconnect()
- } else {
- go client.receiveMessages()
- }
- select {}
- }
|