chatClient.go 10 KB


  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc/codes"
  6. "log"
  7. "math"
  8. "os"
  9. "os/signal"
  10. "sync"
  11. "syscall"
  12. "time"
  13. . "client/chat"
  14. "google.golang.org/grpc"
  15. "google.golang.org/grpc/connectivity"
  16. "google.golang.org/grpc/credentials/insecure"
  17. "google.golang.org/grpc/keepalive"
  18. "google.golang.org/grpc/status"
  19. )
  20. const (
  21. initialReconnectInterval = 1 * time.Second
  22. keepaliveTime = 60 * time.Second
  23. keepaliveTimeout = 20 * time.Second
  24. maxRetryCount = 60
  25. connectionTimeout = 3 * time.Second
  26. maxReconnectInterval = 60 * time.Second
  27. healthCheckInterval = 30 * time.Second
  28. )
  29. var client = &ChatClient{}
  30. type ChatClient struct {
  31. conn *grpc.ClientConn
  32. client ChatServiceClient
  33. ctx context.Context
  34. cancel context.CancelFunc
  35. userID string
  36. mu sync.RWMutex
  37. retryCount int
  38. isConnected bool
  39. wg sync.WaitGroup
  40. reconnecting bool
  41. serviceAddress string
  42. stream ChatService_JoinChatClient
  43. streamMutex sync.Mutex
  44. healthCheckTicker *time.Ticker
  45. lastPingTime time.Time
  46. }
  47. func NewChatClient(userID, address string) *ChatClient {
  48. ctx, cancel := context.WithCancel(context.Background())
  49. return &ChatClient{
  50. userID: userID,
  51. ctx: ctx,
  52. cancel: cancel,
  53. serviceAddress: address,
  54. }
  55. }
  56. // 连接服务器
  57. func (c *ChatClient) connect() error {
  58. c.mu.Lock()
  59. defer c.mu.Unlock()
  60. if c.isConnected && c.conn.GetState() == connectivity.Ready {
  61. return nil
  62. }
  63. log.Println("[连接] 尝试连接服务器...", c.serviceAddress)
  64. ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
  65. defer cancel()
  66. conn, err := grpc.DialContext(ctx, c.serviceAddress,
  67. grpc.WithTransportCredentials(insecure.NewCredentials()),
  68. grpc.WithBlock(),
  69. grpc.WithDefaultCallOptions(
  70. grpc.MaxCallRecvMsgSize(20*1024*1024),
  71. grpc.MaxCallSendMsgSize(20*1024*1024),
  72. ),
  73. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  74. Time: keepaliveTime,
  75. Timeout: keepaliveTimeout,
  76. PermitWithoutStream: true,
  77. }),
  78. )
  79. if err != nil {
  80. return fmt.Errorf("连接失败: %v", err)
  81. }
  82. // 检查连接状态
  83. state := conn.GetState()
  84. if state != connectivity.Ready {
  85. if conn.WaitForStateChange(ctx, connectivity.Connecting) {
  86. state := conn.GetState()
  87. if state != connectivity.Ready {
  88. _ = conn.Close()
  89. return fmt.Errorf("连接未就绪,状态: %v", state)
  90. }
  91. }
  92. }
  93. client := NewChatServiceClient(conn)
  94. _, err = client.JoinChat(context.Background(), &JoinRequest{UserId: c.userID})
  95. if err != nil {
  96. _ = conn.Close()
  97. return fmt.Errorf("连接测试失败: %v", err)
  98. }
  99. // 关闭旧连接
  100. if c.conn != nil {
  101. _ = c.conn.Close()
  102. }
  103. c.conn = conn
  104. c.client = client
  105. c.isConnected = true
  106. c.retryCount = 0
  107. c.lastPingTime = time.Now()
  108. // 启动健康检查
  109. if c.healthCheckTicker != nil {
  110. c.healthCheckTicker.Stop()
  111. c.healthCheckTicker = nil
  112. }
  113. go func() {
  114. time.Sleep(10 * time.Second) // 给连接稳定时间
  115. c.startHealthCheck()
  116. }()
  117. log.Printf("[连接][用户:%s] 服务器连接成功", c.userID)
  118. return nil
  119. }
  120. // 断开连接
  121. func (c *ChatClient) disconnect() {
  122. c.mu.Lock()
  123. defer c.mu.Unlock()
  124. if !c.isConnected {
  125. return
  126. }
  127. // 停止健康检查
  128. if c.healthCheckTicker != nil {
  129. c.healthCheckTicker.Stop()
  130. c.healthCheckTicker = nil
  131. }
  132. c.closeStream()
  133. if c.conn != nil {
  134. if err := c.conn.Close(); err != nil {
  135. log.Printf("[连接][用户:%s] 关闭连接出错: %v", c.userID, err)
  136. }
  137. c.conn = nil
  138. }
  139. c.isConnected = false
  140. log.Printf("[连接][用户:%s] 已断开连接", c.userID)
  141. }
  142. // 关闭流
  143. func (c *ChatClient) closeStream() {
  144. c.streamMutex.Lock()
  145. defer c.streamMutex.Unlock()
  146. if c.stream != nil {
  147. if err := c.stream.CloseSend(); err != nil {
  148. log.Printf("[流] 关闭流错误: %v", err)
  149. }
  150. c.stream = nil
  151. }
  152. }
  153. // 重连逻辑
  154. func (c *ChatClient) reconnect() {
  155. c.mu.Lock()
  156. if c.reconnecting {
  157. c.mu.Unlock()
  158. return
  159. }
  160. c.reconnecting = true
  161. currentRetry := c.retryCount
  162. c.mu.Unlock()
  163. defer func() {
  164. c.mu.Lock()
  165. c.reconnecting = false
  166. c.mu.Unlock()
  167. }()
  168. c.wg.Add(1)
  169. defer c.wg.Done()
  170. log.Printf("[重连] 开始重连流程,当前重试计数: %d", currentRetry)
  171. for {
  172. select {
  173. case <-c.ctx.Done():
  174. log.Println("[重连] 上下文取消,停止重连")
  175. return
  176. default:
  177. c.mu.Lock()
  178. if c.retryCount >= maxRetryCount {
  179. log.Printf("[重连] 达到最大重试次数(%d),停止重连", maxRetryCount)
  180. c.mu.Unlock()
  181. return
  182. }
  183. c.mu.Unlock()
  184. log.Printf("[重连] 尝试第%d次连接...", currentRetry+1)
  185. if err := c.connect(); err != nil {
  186. log.Printf("[重连][用户:%s] 连接失败: %v", c.userID, err)
  187. c.mu.Lock()
  188. c.retryCount++
  189. currentRetry = c.retryCount
  190. c.mu.Unlock()
  191. // 指数退避算法
  192. backoff := initialReconnectInterval
  193. if currentRetry > 0 {
  194. backoff = time.Duration(math.Min(
  195. float64(initialReconnectInterval)*math.Pow(1.5, float64(currentRetry)),
  196. float64(maxReconnectInterval),
  197. ))
  198. }
  199. select {
  200. case <-time.After(backoff):
  201. continue
  202. case <-c.ctx.Done():
  203. log.Printf("[重连][用户:%s] 等待期间上下文取消", c.userID)
  204. return
  205. }
  206. } else {
  207. log.Printf("[重连][用户:%s] 连接成功!", c.userID)
  208. go c.establishStream()
  209. return
  210. }
  211. }
  212. }
  213. }
  214. // 建立流
  215. func (c *ChatClient) establishStream() {
  216. c.wg.Add(1)
  217. defer c.wg.Done()
  218. retryDelay := time.Second
  219. for {
  220. select {
  221. case <-c.ctx.Done():
  222. return
  223. default:
  224. // 添加连接状态检查
  225. if !c.isReady() {
  226. time.Sleep(retryDelay)
  227. retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
  228. continue
  229. }
  230. // 尝试建立流
  231. stream, err := func() (ChatService_JoinChatClient, error) {
  232. c.streamMutex.Lock()
  233. defer c.streamMutex.Unlock()
  234. return c.client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID})
  235. }()
  236. if err != nil {
  237. log.Printf("[流] 建立流失败: %v,等待重试...", err)
  238. time.Sleep(retryDelay)
  239. retryDelay = time.Duration(math.Min(float64(retryDelay*2), float64(maxReconnectInterval)))
  240. continue
  241. }
  242. // 重置重试延迟
  243. retryDelay = time.Second
  244. // 处理消息
  245. if err := c.receiveMessages(stream); err != nil {
  246. log.Printf("[流] 接收消息错误: %v", err)
  247. return
  248. }
  249. }
  250. }
  251. }
  252. // 接收消息
  253. func (c *ChatClient) receiveMessages(stream ChatService_JoinChatClient) error {
  254. for {
  255. msg, err := stream.Recv()
  256. if err != nil {
  257. if status.Code(err) == codes.Canceled {
  258. return nil // 正常关闭
  259. }
  260. return fmt.Errorf("接收消息错误: %v", err)
  261. }
  262. log.Printf("[接收] 收到消息: %+v", msg)
  263. if msg.UserId == "系统" {
  264. switch msg.Action {
  265. case "sendTalk":
  266. go SendTalk(msg.Text)
  267. case "getContacts":
  268. go GetContacts()
  269. case "reject":
  270. go Reject(msg.Text)
  271. default:
  272. log.Printf("[系统通知]: %s", msg.Text)
  273. }
  274. if msg.Text == "欢迎加入聊天室" {
  275. go GetContacts()
  276. }
  277. }
  278. }
  279. }
  280. // 发送消息
  281. func (c *ChatClient) SendMessage(text, action string) error {
  282. c.mu.RLock()
  283. defer c.mu.RUnlock()
  284. if !c.isReady() {
  285. return fmt.Errorf("未连接服务器")
  286. }
  287. msg := &Message{
  288. UserId: c.userID,
  289. Text: text,
  290. Action: action,
  291. }
  292. log.Printf("[发送] 发送消息: %+v", msg)
  293. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  294. defer cancel()
  295. _, err := c.client.SendMessage(ctx, msg)
  296. if err != nil {
  297. log.Printf("[发送] 发送失败: %v", err)
  298. c.disconnect()
  299. go c.reconnect()
  300. return err
  301. }
  302. return nil
  303. }
  304. // 启动客户端
  305. func ConnectGRPC(userId, address string) {
  306. log.Println("[主程序] 启动GRPC连接")
  307. client = NewChatClient(userId, address)
  308. defer client.Shutdown()
  309. if err := client.connect(); err != nil {
  310. log.Printf("[主程序] 初始连接失败: %v", err)
  311. go client.reconnect()
  312. } else {
  313. go client.establishStream()
  314. }
  315. // 保持主线程运行
  316. quit := make(chan os.Signal, 1)
  317. signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
  318. <-quit
  319. }
  320. func (c *ChatClient) isReady() bool {
  321. c.mu.RLock()
  322. defer c.mu.RUnlock()
  323. if !c.isConnected || c.conn == nil {
  324. return false
  325. }
  326. state := c.conn.GetState()
  327. // 只有当状态是Ready时才返回true
  328. return state == connectivity.Ready
  329. }
  330. // startHealthCheck 启动健康检查
  331. func (c *ChatClient) startHealthCheck() {
  332. if c.healthCheckTicker != nil {
  333. c.healthCheckTicker.Stop()
  334. }
  335. c.healthCheckTicker = time.NewTicker(healthCheckInterval)
  336. c.wg.Add(1)
  337. go func() {
  338. defer c.wg.Done()
  339. defer c.healthCheckTicker.Stop()
  340. for {
  341. select {
  342. case <-c.healthCheckTicker.C:
  343. if !c.isReady() {
  344. log.Printf("[健康检查][用户:%s] 连接不可用,触发重连", c.userID)
  345. go c.reconnect()
  346. continue
  347. }
  348. // 执行Ping检查
  349. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  350. _, err := c.client.Ping(ctx, &PingRequest{})
  351. cancel()
  352. if err != nil {
  353. log.Printf("[健康检查][用户:%s] Ping失败: %v", c.userID, err)
  354. c.disconnect()
  355. go c.reconnect()
  356. } else {
  357. c.mu.Lock()
  358. c.lastPingTime = time.Now()
  359. c.mu.Unlock()
  360. }
  361. case <-c.ctx.Done():
  362. log.Printf("[健康检查][用户:%s] 停止健康检查", c.userID)
  363. return
  364. }
  365. }
  366. }()
  367. }
  368. // 修改健康检查实现
  369. func (c *ChatClient) checkHealth() error {
  370. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  371. defer cancel()
  372. // 优先使用JoinChat作为健康检查
  373. _, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID})
  374. if err != nil {
  375. st, ok := status.FromError(err)
  376. if ok && st.Code() == codes.Unimplemented {
  377. // 如果JoinChat未实现,尝试其他方法
  378. return c.fallbackHealthCheck()
  379. }
  380. return err
  381. }
  382. return nil
  383. }
  384. func (c *ChatClient) fallbackHealthCheck() error {
  385. // 实现其他健康检查方式
  386. return nil
  387. }
  388. func (c *ChatClient) Shutdown() {
  389. // 1. 取消上下文
  390. c.cancel()
  391. // 2. 断开连接
  392. c.disconnect()
  393. // 3. 等待所有goroutine结束
  394. c.wg.Wait()
  395. }
  396. func (c *ChatClient) log() *log.Logger {
  397. return log.New(os.Stdout, fmt.Sprintf("[用户:%s] ", c.userID), log.LstdFlags)
  398. }