chatClient.go 15 KB


  1. package service
  2. import (
  3. "client/config"
  4. "context"
  5. "fmt"
  6. "google.golang.org/grpc/codes"
  7. "google.golang.org/grpc/credentials/insecure"
  8. "log"
  9. "math"
  10. "os"
  11. "os/signal"
  12. "sync"
  13. "syscall"
  14. "time"
  15. . "client/chat"
  16. "google.golang.org/grpc"
  17. "google.golang.org/grpc/connectivity"
  18. "google.golang.org/grpc/keepalive"
  19. "google.golang.org/grpc/status"
  20. )
  21. // 定义连接相关的常量参数
  22. const (
  23. initialReconnectInterval = 2 * time.Second // 初始重连间隔时间
  24. keepaliveTime = 20 * time.Second // 客户端keepalive心跳间隔
  25. keepaliveTimeout = 10 * time.Second // keepalive心跳超时时间
  26. connectionTimeout = 10 * time.Second // 连接超时时间
  27. maxReconnectInterval = 120 * time.Second // 最大重连间隔时间
  28. healthCheckInterval = 30 * time.Second // 健康检查间隔时间
  29. )
  30. // 全局客户端实例
  31. var client = &ChatClient{}
  32. // ChatClient 定义gRPC客户端结构体
  33. type ChatClient struct {
  34. conn *grpc.ClientConn // gRPC连接对象
  35. client ChatServiceClient // gRPC服务客户端
  36. ctx context.Context // 上下文对象
  37. cancel context.CancelFunc // 取消函数
  38. userID string // 用户ID
  39. mu sync.RWMutex // 读写锁(保护并发访问)
  40. retryCount int // 当前重试次数
  41. isConnected bool // 连接状态标志
  42. wg sync.WaitGroup // 等待组(用于goroutine同步)
  43. reconnecting bool // 重连状态标志
  44. serviceAddress string // 服务端地址
  45. stream ChatService_JoinChatClient // gRPC流对象
  46. streamMutex sync.Mutex // 流操作互斥锁
  47. healthCheckTicker *time.Ticker // 健康检查定时器
  48. lastPingTime time.Time // 最后心跳时间
  49. }
  50. // NewChatClient 构造函数,创建新的ChatClient实例
  51. func NewChatClient(userID, address string) *ChatClient {
  52. // 创建可取消的上下文
  53. ctx, cancel := context.WithCancel(context.Background())
  54. return &ChatClient{
  55. userID: userID, // 设置用户ID
  56. ctx: ctx, // 设置上下文
  57. cancel: cancel, // 设置取消函数
  58. serviceAddress: address, // 设置服务端地址
  59. }
  60. }
  61. // connect 连接到gRPC服务器
  62. func (c *ChatClient) connect(password string) error {
  63. c.mu.Lock()
  64. defer c.mu.Unlock()
  65. // 检查现有连接是否可用
  66. if c.isConnected && c.conn.GetState() == connectivity.Ready {
  67. return nil
  68. }
  69. // 打印连接日志
  70. log.Println("[连接] 尝试连接服务器...", c.serviceAddress)
  71. // 创建带超时的上下文
  72. ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
  73. defer cancel()
  74. // 建立gRPC连接
  75. conn, err := grpc.DialContext(ctx, c.serviceAddress,
  76. grpc.WithTransportCredentials(insecure.NewCredentials()), // 使用非安全连接
  77. grpc.WithBlock(), // 阻塞式连接
  78. grpc.WithPerRPCCredentials(&authCreds{password: password}), // 设置认证凭证
  79. grpc.WithDefaultCallOptions( // 设置默认调用选项
  80. grpc.MaxCallRecvMsgSize(20*1024*1024), // 最大接收消息大小
  81. grpc.MaxCallSendMsgSize(20*1024*1024), // 最大发送消息大小
  82. ),
  83. grpc.WithKeepaliveParams(keepalive.ClientParameters{ // 设置keepalive参数
  84. Time: keepaliveTime,
  85. Timeout: keepaliveTimeout,
  86. PermitWithoutStream: true, // 允许无活动流时也发送心跳
  87. }),
  88. )
  89. if err != nil {
  90. return fmt.Errorf("连接失败: %v", err)
  91. }
  92. // 检查连接状态
  93. state := conn.GetState()
  94. if state != connectivity.Ready {
  95. if conn.WaitForStateChange(ctx, connectivity.Connecting) {
  96. state := conn.GetState()
  97. if state != connectivity.Ready {
  98. _ = conn.Close()
  99. return fmt.Errorf("连接未就绪,状态: %v", state)
  100. }
  101. }
  102. }
  103. // 创建客户端并测试连接
  104. client := NewChatServiceClient(conn)
  105. _, err = client.JoinChat(context.Background(), &JoinRequest{UserId: c.userID, Force: true})
  106. if err != nil {
  107. _ = conn.Close()
  108. return fmt.Errorf("连接测试失败: %v", err)
  109. }
  110. // 关闭旧连接(如果存在)
  111. if c.conn != nil {
  112. _ = c.conn.Close()
  113. }
  114. // 更新连接状态
  115. c.conn = conn
  116. c.client = client
  117. c.isConnected = true
  118. c.retryCount = 0
  119. c.lastPingTime = time.Now()
  120. // 启动健康检查(延迟10秒)
  121. if c.healthCheckTicker != nil {
  122. c.healthCheckTicker.Stop()
  123. c.healthCheckTicker = nil
  124. }
  125. go func() {
  126. time.Sleep(10 * time.Second) // 给连接稳定时间
  127. c.startHealthCheck()
  128. }()
  129. log.Printf("[连接][用户:%s] 服务器连接成功", c.userID)
  130. return nil
  131. }
  132. // disconnect 断开与服务器的连接
  133. func (c *ChatClient) disconnect() {
  134. c.mu.Lock()
  135. defer c.mu.Unlock()
  136. // 如果已经断开连接,直接返回
  137. if !c.isConnected {
  138. return
  139. }
  140. // 停止健康检查定时器
  141. if c.healthCheckTicker != nil {
  142. c.healthCheckTicker.Stop()
  143. c.healthCheckTicker = nil
  144. }
  145. // 关闭流
  146. c.closeStream()
  147. // 关闭连接
  148. if c.conn != nil {
  149. if err := c.conn.Close(); err != nil {
  150. log.Printf("[连接][用户:%s] 关闭连接出错: %v", c.userID, err)
  151. }
  152. c.conn = nil
  153. }
  154. // 更新连接状态
  155. c.isConnected = false
  156. log.Printf("[连接][用户:%s] 已断开连接", c.userID)
  157. }
  158. // closeStream 关闭gRPC流
  159. func (c *ChatClient) closeStream() {
  160. c.streamMutex.Lock()
  161. defer c.streamMutex.Unlock()
  162. // 如果流存在,则关闭
  163. if c.stream != nil {
  164. if err := c.stream.CloseSend(); err != nil {
  165. log.Printf("[流] 关闭流错误: %v", err)
  166. }
  167. c.stream = nil
  168. }
  169. }
  170. // reconnect 执行重连逻辑
  171. func (c *ChatClient) reconnect() {
  172. const maxRetries = 500 // 最大重试次数
  173. // 加锁检查重连状态
  174. c.mu.Lock()
  175. if c.reconnecting {
  176. c.mu.Unlock()
  177. log.Printf("[重连] 已在重连中,跳过")
  178. return
  179. }
  180. c.reconnecting = true
  181. currentRetry := c.retryCount
  182. c.mu.Unlock()
  183. // 确保结束时重置重连状态
  184. defer func() {
  185. c.mu.Lock()
  186. c.reconnecting = false
  187. c.mu.Unlock()
  188. log.Printf("[重连] 重连流程结束")
  189. }()
  190. // 等待组管理
  191. c.wg.Add(1)
  192. defer c.wg.Done()
  193. // 重连循环
  194. for currentRetry < maxRetries {
  195. select {
  196. case <-c.ctx.Done():
  197. log.Printf("[重连] 上下文取消,终止重连")
  198. return
  199. default:
  200. // 1. 清理旧连接
  201. c.disconnect()
  202. // 2. 尝试新连接
  203. log.Printf("[重连] 尝试第 %d/%d 次重连", currentRetry+1, maxRetries)
  204. err := c.connect(config.Cfg.Password)
  205. if err == nil {
  206. log.Printf("[重连] 成功建立新连接")
  207. go c.establishStream()
  208. return
  209. }
  210. // 3. 错误处理
  211. log.Printf("[重连] 第 %d 次重连失败: %v", currentRetry+1, err)
  212. if isFatalError(err) {
  213. log.Printf("[重连] 遇到致命错误,停止重连: %v", err)
  214. return
  215. }
  216. // 4. 退避等待
  217. backoff := calculateBackoff(currentRetry)
  218. log.Printf("[重连] 等待 %v 后重试", backoff)
  219. select {
  220. case <-time.After(backoff):
  221. currentRetry++
  222. case <-c.ctx.Done():
  223. return
  224. }
  225. }
  226. }
  227. log.Printf("[重连] 已达到最大重试次数 (%d),停止重连", maxRetries)
  228. }
  229. // 辅助函数:判断是否为致命错误(如认证失败、无效地址)
  230. func isFatalError(err error) bool {
  231. if err == nil {
  232. return false
  233. }
  234. // 示例:gRPC 的不可恢复错误码
  235. if status, ok := status.FromError(err); ok {
  236. switch status.Code() {
  237. case codes.Unauthenticated, codes.PermissionDenied, codes.NotFound:
  238. return true
  239. }
  240. }
  241. return false
  242. }
  243. // calculateBackoff 计算退避时间(指数退避算法)
  244. func calculateBackoff(retryCount int) time.Duration {
  245. base := float64(initialReconnectInterval)
  246. max := float64(maxReconnectInterval)
  247. backoff := base * math.Pow(1.5, float64(retryCount))
  248. return time.Duration(math.Min(backoff, max))
  249. }
  250. // establishStream 建立gRPC流
  251. func (c *ChatClient) establishStream() {
  252. // 添加等待组计数
  253. c.wg.Add(1)
  254. defer c.wg.Done()
  255. retryDelay := time.Second
  256. for {
  257. select {
  258. case <-c.ctx.Done(): // 检查是否被取消
  259. return
  260. default:
  261. // 检查连接是否就绪
  262. if !c.isReady() {
  263. time.Sleep(retryDelay)
  264. retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
  265. continue
  266. }
  267. // 尝试建立流
  268. stream, err := func() (ChatService_JoinChatClient, error) {
  269. c.streamMutex.Lock()
  270. defer c.streamMutex.Unlock()
  271. return c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID, Force: true})
  272. }()
  273. if err != nil {
  274. log.Printf("[流] 建立流失败: %v,等待重试...", err)
  275. time.Sleep(retryDelay)
  276. retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
  277. continue
  278. }
  279. // 重置重试延迟
  280. retryDelay = time.Second
  281. // 处理接收到的消息
  282. if err := c.receiveMessages(stream); err != nil {
  283. log.Printf("[流] 接收消息错误: %v", err)
  284. return
  285. }
  286. }
  287. }
  288. }
  289. // receiveMessages 接收并处理消息
  290. func (c *ChatClient) receiveMessages(stream ChatService_JoinChatClient) error {
  291. for {
  292. // 接收消息
  293. msg, err := stream.Recv()
  294. if msg == nil {
  295. continue
  296. }
  297. // 处理错误
  298. if err != nil {
  299. // 区分错误类型
  300. st, ok := status.FromError(err)
  301. if ok {
  302. switch st.Code() {
  303. case codes.Canceled:
  304. log.Println("流正常关闭")
  305. return nil
  306. case codes.Unavailable, codes.DeadlineExceeded:
  307. log.Printf("流异常断开,触发重连: %v", err)
  308. go c.reconnect() // 触发重连
  309. return err
  310. default:
  311. log.Printf("不可恢复错误: %v", err)
  312. return err
  313. }
  314. }
  315. // 非gRPC错误(如网络问题)
  316. log.Printf("[流] 接收错误: %v,触发重连", err)
  317. go c.reconnect()
  318. return err
  319. }
  320. // 跳过空消息
  321. if msg == nil || msg.Text == "" || msg.UserId == "" {
  322. continue
  323. }
  324. // 再次检查错误(冗余检查)
  325. if err != nil {
  326. if status.Code(err) == codes.Canceled {
  327. return nil
  328. }
  329. return fmt.Errorf("接收消息错误: %v", err)
  330. }
  331. // 打印收到的消息
  332. log.Printf("[接收] 收到消息: %+v", msg)
  333. // 处理系统消息
  334. if msg.UserId == "系统" {
  335. switch msg.Action {
  336. case "sendTalk":
  337. go SendTalk(msg.Text)
  338. case "getContacts":
  339. go GetContacts()
  340. case "reject":
  341. go Reject(msg.Text)
  342. }
  343. if msg.Text == "欢迎加入聊天室" {
  344. go GetContacts()
  345. }
  346. }
  347. }
  348. }
  349. // SendMessage 发送消息
  350. func (c *ChatClient) SendMessage(text, action string) error {
  351. c.mu.RLock()
  352. defer c.mu.RUnlock()
  353. // 检查连接是否就绪
  354. if !c.isReady() {
  355. go c.reconnect() // 触发重连
  356. return fmt.Errorf("未连接服务器,正在尝试重连...")
  357. }
  358. // 构造消息
  359. msg := &Message{
  360. UserId: c.userID,
  361. Text: text,
  362. Action: action,
  363. }
  364. log.Printf("[发送] 发送消息: %+v", msg)
  365. // 创建带超时的上下文
  366. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  367. defer cancel()
  368. // 发送消息
  369. aaa, err := c.client.SendMessage(ctx, msg)
  370. log.Println(aaa)
  371. if err != nil {
  372. // 处理不同类型的错误
  373. st, ok := status.FromError(err)
  374. if ok {
  375. switch st.Code() {
  376. case codes.Unavailable, codes.DeadlineExceeded:
  377. go c.reconnect() // 网络问题触发重连
  378. case codes.Unauthenticated, codes.PermissionDenied:
  379. // 认证错误不重连
  380. }
  381. }
  382. return fmt.Errorf("发送失败: %v", err)
  383. }
  384. return nil
  385. }
  386. // ConnectGRPC 启动gRPC客户端
  387. func ConnectGRPC(userId, address string) {
  388. log.Println("[主程序] 启动GRPC连接")
  389. // 创建新客户端
  390. client = NewChatClient(userId, address)
  391. defer client.Shutdown() // 确保退出时关闭
  392. // 启动连接监控
  393. go client.startConnectionMonitor()
  394. // 初始连接
  395. if err := client.connect(config.Cfg.Password); err != nil {
  396. log.Printf("[主程序] 初始连接失败: %v", err)
  397. go client.reconnect()
  398. } else {
  399. go client.establishStream()
  400. }
  401. // 等待退出信号
  402. quit := make(chan os.Signal, 1)
  403. signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
  404. <-quit
  405. }
  406. // isReady 检查连接是否就绪
  407. func (c *ChatClient) isReady() bool {
  408. c.mu.RLock()
  409. defer c.mu.RUnlock()
  410. // 基本检查
  411. if !c.isConnected || c.conn == nil {
  412. return false
  413. }
  414. // 检查连接状态
  415. state := c.conn.GetState()
  416. if state != connectivity.Ready {
  417. return false
  418. }
  419. // 检查心跳时间
  420. log.Println(time.Since(c.lastPingTime), 3*keepaliveTime)
  421. if time.Since(c.lastPingTime) > 3*keepaliveTime {
  422. return false
  423. }
  424. return true
  425. }
  426. // startHealthCheck 启动健康检查
  427. func (c *ChatClient) startHealthCheck() {
  428. c.healthCheckTicker = time.NewTicker(healthCheckInterval)
  429. c.wg.Add(1)
  430. go func() {
  431. defer c.wg.Done()
  432. for {
  433. select {
  434. case <-c.healthCheckTicker.C: // 定时触发
  435. if !c.isReady() {
  436. log.Printf("连接不可用,触发重连")
  437. go c.reconnect()
  438. continue
  439. }
  440. // 执行健康检查
  441. ctx, cancel := context.WithTimeout(c.ctx, 6*time.Second)
  442. _, err := c.client.Ping(ctx, &PingRequest{UserId: c.userID})
  443. cancel()
  444. if err != nil {
  445. log.Printf("健康检查失败: %v", err)
  446. c.disconnect()
  447. go c.reconnect()
  448. } else {
  449. c.mu.Lock()
  450. c.lastPingTime = time.Now() // 更新最后心跳时间
  451. c.mu.Unlock()
  452. }
  453. case <-c.ctx.Done(): // 上下文取消
  454. return
  455. }
  456. }
  457. }()
  458. }
  459. // checkHealth 执行健康检查
  460. func (c *ChatClient) checkHealth() error {
  461. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  462. defer cancel()
  463. // 使用JoinChat方法检查健康状态
  464. _, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID, Force: false})
  465. if err != nil {
  466. return fmt.Errorf("健康检查失败: %w", err)
  467. }
  468. return nil
  469. }
  470. // Shutdown 关闭客户端
  471. func (c *ChatClient) Shutdown() {
  472. log.Println("客户端服务关闭")
  473. // 1. 取消上下文
  474. c.cancel()
  475. // 2. 断开连接
  476. c.disconnect()
  477. // 3. 等待所有goroutine结束
  478. c.wg.Wait()
  479. }
  480. // authCreds 实现gRPC认证接口
  481. type authCreds struct {
  482. password string
  483. }
  484. // GetRequestMetadata 获取认证元数据
  485. func (c *authCreds) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
  486. return map[string]string{
  487. "password": c.password,
  488. }, nil
  489. }
  490. // RequireTransportSecurity 是否要求传输安全
  491. func (c *authCreds) RequireTransportSecurity() bool {
  492. return false // 不要求TLS
  493. }
  494. // startConnectionMonitor 启动连接监控
  495. func (c *ChatClient) startConnectionMonitor() {
  496. c.wg.Add(1)
  497. go func() {
  498. defer c.wg.Done()
  499. ticker := time.NewTicker(1 * time.Minute) // 每5分钟检查一次
  500. defer ticker.Stop()
  501. for {
  502. select {
  503. case <-ticker.C: // 定时触发
  504. c.mu.RLock()
  505. conn := c.conn
  506. c.mu.RUnlock()
  507. if conn == nil {
  508. log.Printf("[监控] 连接不存在,触发重连")
  509. go c.reconnect()
  510. continue
  511. }
  512. // 检查连接状态
  513. state := conn.GetState()
  514. log.Printf("[监控] 当前连接状态: %v", state)
  515. // 判断是否需要重连
  516. if state == connectivity.TransientFailure ||
  517. state == connectivity.Shutdown ||
  518. (state == connectivity.Ready && time.Since(c.lastPingTime) > 3*keepaliveTime) {
  519. log.Printf("[监控] 连接异常,触发重连")
  520. go c.reconnect()
  521. }
  522. case <-c.ctx.Done(): // 上下文取消
  523. log.Printf("[监控] 监控停止")
  524. return
  525. }
  526. }
  527. }()
  528. }