chatClient.go 15 KB


  1. package service
  2. import (
  3. "client/config"
  4. "context"
  5. "fmt"
  6. "google.golang.org/grpc/codes"
  7. "google.golang.org/grpc/credentials/insecure"
  8. "log"
  9. "math"
  10. "os"
  11. "os/signal"
  12. "sync"
  13. "syscall"
  14. "time"
  15. . "client/chat"
  16. "google.golang.org/grpc"
  17. "google.golang.org/grpc/connectivity"
  18. "google.golang.org/grpc/keepalive"
  19. "google.golang.org/grpc/status"
  20. )
  21. // 定义连接相关的常量参数
  22. const (
  23. initialReconnectInterval = 2 * time.Second // 初始重连间隔时间
  24. keepaliveTime = 20 * time.Second // 客户端keepalive心跳间隔
  25. keepaliveTimeout = 10 * time.Second // keepalive心跳超时时间
  26. connectionTimeout = 10 * time.Second // 连接超时时间
  27. maxReconnectInterval = 120 * time.Second // 最大重连间隔时间
  28. healthCheckInterval = 30 * time.Second // 健康检查间隔时间
  29. )
  30. // 全局客户端实例
  31. var client = &ChatClient{}
  32. // ChatClient 定义gRPC客户端结构体
  33. type ChatClient struct {
  34. conn *grpc.ClientConn // gRPC连接对象
  35. client ChatServiceClient // gRPC服务客户端
  36. ctx context.Context // 上下文对象
  37. cancel context.CancelFunc // 取消函数
  38. userID string // 用户ID
  39. mu sync.RWMutex // 读写锁(保护并发访问)
  40. retryCount int // 当前重试次数
  41. isConnected bool // 连接状态标志
  42. wg sync.WaitGroup // 等待组(用于goroutine同步)
  43. reconnecting bool // 重连状态标志
  44. serviceAddress string // 服务端地址
  45. stream ChatService_JoinChatClient // gRPC流对象
  46. streamMutex sync.Mutex // 流操作互斥锁
  47. healthCheckTicker *time.Ticker // 健康检查定时器
  48. lastPingTime time.Time // 最后心跳时间
  49. }
  50. // NewChatClient 构造函数,创建新的ChatClient实例
  51. func NewChatClient(userID, address string) *ChatClient {
  52. // 创建可取消的上下文
  53. ctx, cancel := context.WithCancel(context.Background())
  54. return &ChatClient{
  55. userID: userID, // 设置用户ID
  56. ctx: ctx, // 设置上下文
  57. cancel: cancel, // 设置取消函数
  58. serviceAddress: address, // 设置服务端地址
  59. }
  60. }
  61. // connect 连接到gRPC服务器
  62. func (c *ChatClient) connect(password string) error {
  63. c.mu.Lock()
  64. defer c.mu.Unlock()
  65. // 检查现有连接是否可用
  66. if c.isConnected && c.conn.GetState() == connectivity.Ready {
  67. return nil
  68. }
  69. // 打印连接日志
  70. log.Println("[连接] 尝试连接服务器...", c.serviceAddress)
  71. // 创建带超时的上下文
  72. ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
  73. defer cancel()
  74. // 建立gRPC连接
  75. conn, err := grpc.DialContext(ctx, c.serviceAddress,
  76. grpc.WithTransportCredentials(insecure.NewCredentials()), // 使用非安全连接
  77. grpc.WithBlock(), // 阻塞式连接
  78. grpc.WithPerRPCCredentials(&authCreds{password: password}), // 设置认证凭证
  79. grpc.WithDefaultCallOptions( // 设置默认调用选项
  80. grpc.MaxCallRecvMsgSize(20*1024*1024), // 最大接收消息大小
  81. grpc.MaxCallSendMsgSize(20*1024*1024), // 最大发送消息大小
  82. ),
  83. grpc.WithKeepaliveParams(keepalive.ClientParameters{ // 设置keepalive参数
  84. Time: keepaliveTime,
  85. Timeout: keepaliveTimeout,
  86. PermitWithoutStream: true, // 允许无活动流时也发送心跳
  87. }),
  88. )
  89. if err != nil {
  90. return fmt.Errorf("连接失败: %v", err)
  91. }
  92. // 检查连接状态
  93. state := conn.GetState()
  94. if state != connectivity.Ready {
  95. if conn.WaitForStateChange(ctx, connectivity.Connecting) {
  96. state := conn.GetState()
  97. if state != connectivity.Ready {
  98. _ = conn.Close()
  99. return fmt.Errorf("连接未就绪,状态: %v", state)
  100. }
  101. }
  102. }
  103. // 创建客户端并测试连接
  104. client := NewChatServiceClient(conn)
  105. _, err = client.JoinChat(context.Background(), &JoinRequest{UserId: c.userID, Force: true})
  106. if err != nil {
  107. _ = conn.Close()
  108. return fmt.Errorf("连接测试失败: %v", err)
  109. }
  110. // 关闭旧连接(如果存在)
  111. if c.conn != nil {
  112. _ = c.conn.Close()
  113. }
  114. // 更新连接状态
  115. c.conn = conn
  116. c.client = client
  117. c.isConnected = true
  118. c.retryCount = 0
  119. c.lastPingTime = time.Now()
  120. // 启动健康检查(延迟10秒)
  121. if c.healthCheckTicker != nil {
  122. c.healthCheckTicker.Stop()
  123. c.healthCheckTicker = nil
  124. }
  125. go func() {
  126. time.Sleep(10 * time.Second) // 给连接稳定时间
  127. c.startHealthCheck()
  128. }()
  129. log.Printf("[连接][用户:%s] 服务器连接成功", c.userID)
  130. return nil
  131. }
  132. // disconnect 断开与服务器的连接
  133. func (c *ChatClient) disconnect(fool bool) {
  134. c.mu.Lock()
  135. defer c.mu.Unlock()
  136. // 如果已经断开连接,直接返回
  137. if !c.isConnected {
  138. return
  139. }
  140. // 停止健康检查定时器
  141. if c.healthCheckTicker != nil {
  142. c.healthCheckTicker.Stop()
  143. c.healthCheckTicker = nil
  144. }
  145. // 关闭流
  146. c.closeStream()
  147. // 关闭连接
  148. if c.conn != nil {
  149. if err := c.conn.Close(); err != nil {
  150. log.Printf("[连接][用户:%s] 关闭连接出错: %v", c.userID, err)
  151. }
  152. c.conn = nil
  153. }
  154. // 更新连接状态
  155. c.isConnected = false
  156. log.Printf("[连接][用户:%s] 已断开连接", c.userID)
  157. if fool {
  158. go c.reconnect()
  159. }
  160. }
  161. // closeStream 关闭gRPC流
  162. func (c *ChatClient) closeStream() {
  163. c.streamMutex.Lock()
  164. defer c.streamMutex.Unlock()
  165. // 如果流存在,则关闭
  166. if c.stream != nil {
  167. if err := c.stream.CloseSend(); err != nil {
  168. log.Printf("[流] 关闭流错误: %v", err)
  169. }
  170. c.stream = nil
  171. }
  172. }
  173. // reconnect 执行重连逻辑
  174. func (c *ChatClient) reconnect() {
  175. const maxRetries = 500 // 最大重试次数
  176. // 加锁检查重连状态
  177. c.mu.Lock()
  178. if c.reconnecting {
  179. c.mu.Unlock()
  180. log.Printf("[重连] 已在重连中,跳过")
  181. return
  182. }
  183. c.reconnecting = true
  184. currentRetry := c.retryCount
  185. c.mu.Unlock()
  186. // 确保结束时重置重连状态
  187. defer func() {
  188. c.mu.Lock()
  189. c.reconnecting = false
  190. c.mu.Unlock()
  191. log.Printf("[重连] 重连流程结束")
  192. }()
  193. // 等待组管理
  194. c.wg.Add(1)
  195. defer c.wg.Done()
  196. // 重连循环
  197. for currentRetry < maxRetries {
  198. select {
  199. case <-c.ctx.Done():
  200. log.Printf("[重连] 上下文取消,终止重连")
  201. return
  202. default:
  203. // 1. 清理旧连接
  204. c.disconnect(false)
  205. // 2. 尝试新连接
  206. log.Printf("[重连] 尝试第 %d/%d 次重连", currentRetry+1, maxRetries)
  207. err := c.connect(config.Cfg.Password)
  208. if err == nil {
  209. log.Printf("[重连] 成功建立新连接")
  210. go c.establishStream()
  211. return
  212. }
  213. // 3. 错误处理
  214. log.Printf("[重连] 第 %d 次重连失败: %v", currentRetry+1, err)
  215. if isFatalError(err) {
  216. log.Printf("[重连] 遇到致命错误,停止重连: %v", err)
  217. return
  218. }
  219. // 4. 退避等待
  220. backoff := calculateBackoff(currentRetry)
  221. log.Printf("[重连] 等待 %v 后重试", backoff)
  222. select {
  223. case <-time.After(backoff):
  224. currentRetry++
  225. case <-c.ctx.Done():
  226. return
  227. }
  228. }
  229. }
  230. log.Printf("[重连] 已达到最大重试次数 (%d),停止重连", maxRetries)
  231. }
  232. // 辅助函数:判断是否为致命错误(如认证失败、无效地址)
  233. func isFatalError(err error) bool {
  234. if err == nil {
  235. return false
  236. }
  237. // 示例:gRPC 的不可恢复错误码
  238. if status, ok := status.FromError(err); ok {
  239. switch status.Code() {
  240. case codes.Unauthenticated, codes.PermissionDenied, codes.NotFound:
  241. return true
  242. }
  243. }
  244. return false
  245. }
  246. // calculateBackoff 计算退避时间(指数退避算法)
  247. func calculateBackoff(retryCount int) time.Duration {
  248. base := float64(initialReconnectInterval)
  249. max := float64(maxReconnectInterval)
  250. backoff := base * math.Pow(1.5, float64(retryCount))
  251. return time.Duration(math.Min(backoff, max))
  252. }
  253. // establishStream 建立gRPC流
  254. func (c *ChatClient) establishStream() {
  255. // 添加等待组计数
  256. c.wg.Add(1)
  257. defer c.wg.Done()
  258. retryDelay := time.Second
  259. for {
  260. select {
  261. case <-c.ctx.Done(): // 检查是否被取消
  262. return
  263. default:
  264. // 检查连接是否就绪
  265. if !c.isReady() {
  266. time.Sleep(retryDelay)
  267. retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
  268. continue
  269. }
  270. // 尝试建立流
  271. stream, err := func() (ChatService_JoinChatClient, error) {
  272. c.streamMutex.Lock()
  273. defer c.streamMutex.Unlock()
  274. return c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID, Force: true})
  275. }()
  276. if err != nil {
  277. log.Printf("[流] 建立流失败: %v,等待重试...", err)
  278. time.Sleep(retryDelay)
  279. retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
  280. continue
  281. }
  282. // 重置重试延迟
  283. retryDelay = time.Second
  284. // 处理接收到的消息
  285. if err := c.receiveMessages(stream); err != nil {
  286. log.Printf("[流] 接收消息错误: %v", err)
  287. return
  288. }
  289. }
  290. }
  291. }
  292. // receiveMessages 接收并处理消息
  293. func (c *ChatClient) receiveMessages(stream ChatService_JoinChatClient) error {
  294. for {
  295. // 接收消息
  296. msg, err := stream.Recv()
  297. if msg == nil {
  298. continue
  299. }
  300. // 处理错误
  301. if err != nil {
  302. // 区分错误类型
  303. st, ok := status.FromError(err)
  304. if ok {
  305. switch st.Code() {
  306. case codes.Canceled:
  307. log.Println("流正常关闭")
  308. return nil
  309. case codes.Unavailable, codes.DeadlineExceeded:
  310. log.Printf("流异常断开,触发重连: %v", err)
  311. go c.reconnect() // 触发重连
  312. return err
  313. default:
  314. log.Printf("不可恢复错误: %v", err)
  315. return err
  316. }
  317. }
  318. // 非gRPC错误(如网络问题)
  319. log.Printf("[流] 接收错误: %v,触发重连", err)
  320. go c.reconnect()
  321. return err
  322. }
  323. // 跳过空消息
  324. if msg == nil || msg.Text == "" || msg.UserId == "" {
  325. continue
  326. }
  327. // 再次检查错误(冗余检查)
  328. if err != nil {
  329. if status.Code(err) == codes.Canceled {
  330. return nil
  331. }
  332. return fmt.Errorf("接收消息错误: %v", err)
  333. }
  334. // 打印收到的消息
  335. log.Printf("[接收] 收到消息: %+v", msg)
  336. // 处理系统消息
  337. if msg.UserId == "系统" {
  338. switch msg.Action {
  339. case "sendTalk":
  340. go SendTalk(msg.Text)
  341. case "getContacts":
  342. go GetContacts()
  343. case "reject":
  344. go Reject(msg.Text)
  345. }
  346. if msg.Text == "欢迎加入聊天室" {
  347. go GetContacts()
  348. }
  349. }
  350. }
  351. }
  352. // SendMessage 发送消息
  353. func (c *ChatClient) SendMessage(text, action string) error {
  354. c.mu.RLock()
  355. defer c.mu.RUnlock()
  356. // 检查连接是否就绪
  357. if !c.isReady() {
  358. go c.reconnect() // 触发重连
  359. return fmt.Errorf("未连接服务器,正在尝试重连...")
  360. }
  361. // 构造消息
  362. msg := &Message{
  363. UserId: c.userID,
  364. Text: text,
  365. Action: action,
  366. }
  367. log.Printf("[发送] 发送消息: %+v", msg)
  368. // 创建带超时的上下文
  369. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  370. defer cancel()
  371. // 发送消息
  372. aaa, err := c.client.SendMessage(ctx, msg)
  373. log.Println(aaa)
  374. if err != nil {
  375. // 处理不同类型的错误
  376. st, ok := status.FromError(err)
  377. if ok {
  378. switch st.Code() {
  379. case codes.Unavailable, codes.DeadlineExceeded:
  380. go c.reconnect() // 网络问题触发重连
  381. case codes.Unauthenticated, codes.PermissionDenied:
  382. // 认证错误不重连
  383. }
  384. }
  385. return fmt.Errorf("发送失败: %v", err)
  386. }
  387. return nil
  388. }
  389. // ConnectGRPC 启动gRPC客户端
  390. func ConnectGRPC(userId, address string) {
  391. log.Println("[主程序] 启动GRPC连接")
  392. // 创建新客户端
  393. client = NewChatClient(userId, address)
  394. defer client.Shutdown() // 确保退出时关闭
  395. // 启动连接监控
  396. go client.startConnectionMonitor()
  397. // 初始连接
  398. if err := client.connect(config.Cfg.Password); err != nil {
  399. log.Printf("[主程序] 初始连接失败: %v", err)
  400. go client.reconnect()
  401. } else {
  402. go client.establishStream()
  403. }
  404. // 等待退出信号
  405. quit := make(chan os.Signal, 1)
  406. signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
  407. <-quit
  408. }
  409. // isReady 检查连接是否就绪
  410. func (c *ChatClient) isReady() bool {
  411. c.mu.RLock()
  412. defer c.mu.RUnlock()
  413. // 基本检查
  414. if !c.isConnected || c.conn == nil {
  415. return false
  416. }
  417. // 检查连接状态
  418. state := c.conn.GetState()
  419. if state != connectivity.Ready {
  420. return false
  421. }
  422. // 检查心跳时间
  423. log.Println(time.Since(c.lastPingTime), 3*keepaliveTime)
  424. if time.Since(c.lastPingTime) > 3*keepaliveTime {
  425. return false
  426. }
  427. return true
  428. }
  429. // startHealthCheck 启动健康检查
  430. func (c *ChatClient) startHealthCheck() {
  431. c.healthCheckTicker = time.NewTicker(healthCheckInterval)
  432. c.wg.Add(1)
  433. go func() {
  434. defer c.wg.Done()
  435. for {
  436. select {
  437. case <-c.healthCheckTicker.C: // 定时触发
  438. if !c.isReady() {
  439. log.Printf("连接不可用,触发重连")
  440. go c.reconnect()
  441. continue
  442. }
  443. // 执行健康检查
  444. ctx, cancel := context.WithTimeout(c.ctx, 6*time.Second)
  445. _, err := c.client.Ping(ctx, &PingRequest{UserId: c.userID})
  446. cancel()
  447. if err != nil {
  448. log.Printf("健康检查失败: %v", err)
  449. c.disconnect(true)
  450. } else {
  451. c.mu.Lock()
  452. c.lastPingTime = time.Now() // 更新最后心跳时间
  453. c.mu.Unlock()
  454. }
  455. case <-c.ctx.Done(): // 上下文取消
  456. return
  457. }
  458. }
  459. }()
  460. }
  461. // checkHealth 执行健康检查
  462. func (c *ChatClient) checkHealth() error {
  463. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  464. defer cancel()
  465. // 使用JoinChat方法检查健康状态
  466. _, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID, Force: false})
  467. if err != nil {
  468. return fmt.Errorf("健康检查失败: %w", err)
  469. }
  470. return nil
  471. }
  472. // Shutdown 关闭客户端
  473. func (c *ChatClient) Shutdown() {
  474. log.Println("客户端服务关闭")
  475. // 1. 取消上下文
  476. c.cancel()
  477. // 2. 断开连接
  478. c.disconnect(false)
  479. // 3. 等待所有goroutine结束
  480. c.wg.Wait()
  481. }
  482. // authCreds 实现gRPC认证接口
  483. type authCreds struct {
  484. password string
  485. }
  486. // GetRequestMetadata 获取认证元数据
  487. func (c *authCreds) GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error) {
  488. return map[string]string{
  489. "password": c.password,
  490. }, nil
  491. }
  492. // RequireTransportSecurity 是否要求传输安全
  493. func (c *authCreds) RequireTransportSecurity() bool {
  494. return false // 不要求TLS
  495. }
  496. // startConnectionMonitor 启动连接监控
  497. func (c *ChatClient) startConnectionMonitor() {
  498. c.wg.Add(1)
  499. go func() {
  500. defer c.wg.Done()
  501. ticker := time.NewTicker(1 * time.Minute) // 每5分钟检查一次
  502. defer ticker.Stop()
  503. for {
  504. select {
  505. case <-ticker.C: // 定时触发
  506. c.mu.RLock()
  507. conn := c.conn
  508. c.mu.RUnlock()
  509. if conn == nil {
  510. log.Printf("[监控] 连接不存在,触发重连")
  511. go c.reconnect()
  512. continue
  513. }
  514. // 检查连接状态
  515. state := conn.GetState()
  516. log.Printf("[监控] 当前连接状态: %v", state)
  517. // 判断是否需要重连
  518. if state == connectivity.TransientFailure ||
  519. state == connectivity.Shutdown ||
  520. (state == connectivity.Ready && time.Since(c.lastPingTime) > 3*keepaliveTime) {
  521. log.Printf("[监控] 连接异常,触发重连")
  522. go c.reconnect()
  523. }
  524. case <-c.ctx.Done(): // 上下文取消
  525. log.Printf("[监控] 监控停止")
  526. return
  527. }
  528. }
  529. }()
  530. }