commandHandle.go 6.1 KB


  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "sync"
  7. "time"
  8. . "client/chat"
  9. "google.golang.org/grpc"
  10. "google.golang.org/grpc/credentials/insecure"
  11. "google.golang.org/grpc/keepalive"
  12. )
  13. const (
  14. reconnectInterval = 60 * time.Second
  15. timeInt64 = 60 * time.Second
  16. TimeoutInt64 = 20 * time.Second
  17. maxRetryCount = 60
  18. )
  19. type Config struct {
  20. ServerAddress string
  21. ReconnectInterval time.Duration
  22. MaxRetryCount int
  23. }
  24. var client = &ChatClient{}
  25. type ChatClient struct {
  26. conn *grpc.ClientConn
  27. client ChatServiceClient
  28. ctx context.Context
  29. cancel context.CancelFunc
  30. userID string
  31. mu sync.Mutex
  32. retryCount int
  33. isConnected bool
  34. wg sync.WaitGroup
  35. reconnecting bool
  36. ServiceAddress string
  37. }
  38. func NewChatClient(userID, address string) *ChatClient {
  39. ctx, cancel := context.WithCancel(context.Background())
  40. return &ChatClient{
  41. userID: userID,
  42. ctx: ctx,
  43. cancel: cancel,
  44. ServiceAddress: address,
  45. }
  46. }
  47. // grpc连接
  48. func (c *ChatClient) connect() error {
  49. c.mu.Lock()
  50. defer c.mu.Unlock()
  51. if c.isConnected {
  52. return nil
  53. }
  54. log.Println("[连接] 尝试连接服务器...")
  55. ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
  56. defer cancel()
  57. conn, err := grpc.DialContext(ctx, "localhost:50051",
  58. grpc.WithTransportCredentials(insecure.NewCredentials()),
  59. grpc.WithBlock(),
  60. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  61. Time: timeInt64,
  62. Timeout: TimeoutInt64,
  63. PermitWithoutStream: true,
  64. }))
  65. if err != nil {
  66. return fmt.Errorf("连接失败: %v", err)
  67. }
  68. // 测试连接
  69. testClient := NewChatServiceClient(conn)
  70. _, err = testClient.JoinChat(context.Background(), &JoinRequest{UserId: c.userID})
  71. if err != nil {
  72. _ = conn.Close()
  73. return fmt.Errorf("连接测试失败: %v", err)
  74. }
  75. c.conn = conn
  76. c.client = testClient
  77. c.isConnected = true
  78. c.retryCount = 0
  79. log.Println("[连接] 服务器连接成功")
  80. return nil
  81. }
  82. func (c *ChatClient) disconnect() {
  83. c.mu.Lock()
  84. defer c.mu.Unlock()
  85. if !c.isConnected {
  86. return
  87. }
  88. if c.conn != nil {
  89. if err := c.conn.Close(); err != nil {
  90. log.Println("[连接] 关闭连接出错: %v", err)
  91. }
  92. c.conn = nil
  93. }
  94. c.isConnected = false
  95. log.Println("[连接] 已断开连接")
  96. }
  97. func (c *ChatClient) reconnect() {
  98. // 初始加锁检查重连状态
  99. c.mu.Lock()
  100. if c.reconnecting {
  101. c.mu.Unlock()
  102. return
  103. }
  104. c.reconnecting = true
  105. currentRetry := c.retryCount
  106. c.mu.Unlock()
  107. // 确保最终重置重连状态
  108. defer func() {
  109. c.mu.Lock()
  110. c.reconnecting = false
  111. c.mu.Unlock()
  112. }()
  113. // 等待组管理
  114. c.wg.Add(1)
  115. defer c.wg.Done()
  116. log.Println("[重连] 开始重连流程,当前重试计数: %d", currentRetry)
  117. for {
  118. select {
  119. case <-c.ctx.Done():
  120. log.Println("[重连] 上下文取消,停止重连")
  121. return
  122. default:
  123. // 检查最大重试次数
  124. c.mu.Lock()
  125. if c.retryCount >= maxRetryCount {
  126. log.Println("[重连] 达到最大重试次数(%d),停止重连", maxRetryCount)
  127. c.mu.Unlock()
  128. return
  129. }
  130. c.mu.Unlock()
  131. // 尝试连接
  132. log.Println("[重连] 尝试第%d次连接...", currentRetry+1)
  133. if err := c.connect(); err != nil {
  134. log.Println("[重连] 连接失败: %v", err)
  135. // 更新重试计数
  136. c.mu.Lock()
  137. c.retryCount++
  138. currentRetry = c.retryCount
  139. c.mu.Unlock()
  140. log.Println("[重连] 等待%d秒后重试...", int(reconnectInterval.Seconds()))
  141. select {
  142. case <-time.After(reconnectInterval):
  143. continue
  144. case <-c.ctx.Done():
  145. log.Println("[重连] 等待期间上下文取消")
  146. return
  147. }
  148. } else {
  149. log.Println("[重连] 连接成功!")
  150. go c.receiveMessages()
  151. return
  152. }
  153. }
  154. }
  155. }
  156. // 接收信息处理
  157. // 修改后的 receiveMessages 方法
  158. func (c *ChatClient) receiveMessages() {
  159. c.wg.Add(1)
  160. defer c.wg.Done()
  161. for {
  162. select {
  163. case <-c.ctx.Done():
  164. return
  165. default:
  166. c.mu.Lock()
  167. connected := c.isConnected
  168. c.mu.Unlock()
  169. if !connected {
  170. time.Sleep(1 * time.Second)
  171. continue
  172. }
  173. // 加锁保护整个连接过程
  174. c.mu.Lock()
  175. stream, err := c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID})
  176. c.mu.Unlock()
  177. if err != nil {
  178. log.Println(fmt.Sprintf("[接收] 加入聊天室失败: %v", err))
  179. c.disconnect()
  180. go c.reconnect()
  181. return
  182. }
  183. // 消息接收循环
  184. for {
  185. msg, err := stream.Recv()
  186. if err != nil {
  187. log.Println(fmt.Sprintf("[接收] 接收消息错误: %v\n", err))
  188. c.disconnect()
  189. go c.reconnect()
  190. return
  191. }
  192. // ... 处理消息逻辑 ...
  193. log.Println("[接收] 收到消息: ", msg)
  194. if msg.UserId == "系统" {
  195. switch msg.Action {
  196. case "sendTalk":
  197. go SendTalk(msg.Text)
  198. case "getContacts":
  199. go GetContacts()
  200. case "reject":
  201. go Reject(msg.Text)
  202. default:
  203. //后期删除掉
  204. if msg.Text == "欢迎加入聊天室!" {
  205. log.Println("发送固定信息", client.SendMessage("", "sendTalk"))
  206. }
  207. log.Println(fmt.Sprintf("[系统通知]: %s ", msg.Text))
  208. }
  209. }
  210. }
  211. }
  212. }
  213. }
  214. // SendMessage 发送单条消息
  215. func (c *ChatClient) SendMessage(text string, action string) error {
  216. c.mu.Lock()
  217. defer c.mu.Unlock()
  218. if !c.isConnected {
  219. return fmt.Errorf("未连接服务器")
  220. }
  221. msg := &Message{
  222. UserId: c.userID,
  223. Text: text,
  224. Action: action,
  225. }
  226. log.Println(fmt.Sprintf("[发送] 发送消息 : %+v\n", msg))
  227. _, err := c.client.SendMessage(c.ctx, msg)
  228. if err != nil {
  229. log.Println(fmt.Sprintf("[发送] 发送失败: %v", err))
  230. c.disconnect()
  231. go c.reconnect()
  232. return err
  233. }
  234. return nil
  235. }
  236. func ConnectGRPC(userId string, address string) {
  237. log.Println("[主程序] 启动GRPC连接")
  238. client = NewChatClient(userId, address)
  239. defer func() {
  240. client.cancel()
  241. client.disconnect()
  242. client.wg.Wait()
  243. log.Println("[主程序] 客户端安全退出")
  244. }()
  245. // 初始连接
  246. if err := client.connect(); err != nil {
  247. log.Println(fmt.Sprintf("[主程序] 初始连接失败: %v", err))
  248. go client.reconnect()
  249. } else {
  250. go client.receiveMessages()
  251. }
  252. select {}
  253. }