chatClient.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "google.golang.org/grpc/codes"
  6. "log"
  7. "math"
  8. "os"
  9. "os/signal"
  10. "sync"
  11. "syscall"
  12. "time"
  13. . "client/chat"
  14. "google.golang.org/grpc"
  15. "google.golang.org/grpc/connectivity"
  16. "google.golang.org/grpc/credentials/insecure"
  17. "google.golang.org/grpc/keepalive"
  18. "google.golang.org/grpc/status"
  19. )
  20. const (
  21. initialReconnectInterval = 1 * time.Second
  22. keepaliveTime = 60 * time.Second
  23. keepaliveTimeout = 20 * time.Second
  24. maxRetryCount = 60
  25. connectionTimeout = 3 * time.Second
  26. maxReconnectInterval = 60 * time.Second
  27. healthCheckInterval = 30 * time.Second
  28. )
  29. var client = &ChatClient{}
  30. type ChatClient struct {
  31. conn *grpc.ClientConn
  32. client ChatServiceClient
  33. ctx context.Context
  34. cancel context.CancelFunc
  35. userID string
  36. mu sync.RWMutex
  37. retryCount int
  38. isConnected bool
  39. wg sync.WaitGroup
  40. reconnecting bool
  41. serviceAddress string
  42. stream ChatService_JoinChatClient
  43. streamMutex sync.Mutex
  44. healthCheckTicker *time.Ticker
  45. lastPingTime time.Time
  46. }
  47. func NewChatClient(userID, address string) *ChatClient {
  48. ctx, cancel := context.WithCancel(context.Background())
  49. return &ChatClient{
  50. userID: userID,
  51. ctx: ctx,
  52. cancel: cancel,
  53. serviceAddress: address,
  54. }
  55. }
  56. // 连接服务器
  57. func (c *ChatClient) connect() error {
  58. c.mu.Lock()
  59. defer c.mu.Unlock()
  60. if c.isConnected && c.conn.GetState() == connectivity.Ready {
  61. return nil
  62. }
  63. log.Println("[连接] 尝试连接服务器...", c.serviceAddress)
  64. ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
  65. defer cancel()
  66. conn, err := grpc.DialContext(ctx, c.serviceAddress,
  67. grpc.WithTransportCredentials(insecure.NewCredentials()),
  68. grpc.WithBlock(),
  69. grpc.WithDefaultCallOptions(
  70. grpc.MaxCallRecvMsgSize(20*1024*1024),
  71. grpc.MaxCallSendMsgSize(20*1024*1024),
  72. ),
  73. grpc.WithKeepaliveParams(keepalive.ClientParameters{
  74. Time: keepaliveTime,
  75. Timeout: keepaliveTimeout,
  76. PermitWithoutStream: true,
  77. }),
  78. )
  79. if err != nil {
  80. return fmt.Errorf("连接失败: %v", err)
  81. }
  82. // 检查连接状态
  83. state := conn.GetState()
  84. if state != connectivity.Ready {
  85. if conn.WaitForStateChange(ctx, connectivity.Connecting) {
  86. state := conn.GetState()
  87. if state != connectivity.Ready {
  88. _ = conn.Close()
  89. return fmt.Errorf("连接未就绪,状态: %v", state)
  90. }
  91. }
  92. }
  93. client := NewChatServiceClient(conn)
  94. _, err = client.JoinChat(context.Background(), &JoinRequest{UserId: c.userID})
  95. if err != nil {
  96. _ = conn.Close()
  97. return fmt.Errorf("连接测试失败: %v", err)
  98. }
  99. // 关闭旧连接
  100. if c.conn != nil {
  101. _ = c.conn.Close()
  102. }
  103. c.conn = conn
  104. c.client = client
  105. c.isConnected = true
  106. c.retryCount = 0
  107. c.lastPingTime = time.Now()
  108. // 启动健康检查
  109. c.startHealthCheck()
  110. log.Printf("[连接][用户:%s] 服务器连接成功", c.userID)
  111. return nil
  112. }
  113. // 断开连接
  114. func (c *ChatClient) disconnect() {
  115. c.mu.Lock()
  116. defer c.mu.Unlock()
  117. if !c.isConnected {
  118. return
  119. }
  120. // 停止健康检查
  121. if c.healthCheckTicker != nil {
  122. c.healthCheckTicker.Stop()
  123. c.healthCheckTicker = nil
  124. }
  125. c.closeStream()
  126. if c.conn != nil {
  127. if err := c.conn.Close(); err != nil {
  128. log.Printf("[连接][用户:%s] 关闭连接出错: %v", c.userID, err)
  129. }
  130. c.conn = nil
  131. }
  132. c.isConnected = false
  133. log.Printf("[连接][用户:%s] 已断开连接", c.userID)
  134. }
  135. // 关闭流
  136. func (c *ChatClient) closeStream() {
  137. c.streamMutex.Lock()
  138. defer c.streamMutex.Unlock()
  139. if c.stream != nil {
  140. if err := c.stream.CloseSend(); err != nil {
  141. log.Printf("[流] 关闭流错误: %v", err)
  142. }
  143. c.stream = nil
  144. }
  145. }
  146. // 重连逻辑
  147. func (c *ChatClient) reconnect() {
  148. c.mu.Lock()
  149. if c.reconnecting {
  150. c.mu.Unlock()
  151. return
  152. }
  153. c.reconnecting = true
  154. currentRetry := c.retryCount
  155. c.mu.Unlock()
  156. defer func() {
  157. c.mu.Lock()
  158. c.reconnecting = false
  159. c.mu.Unlock()
  160. }()
  161. c.wg.Add(1)
  162. defer c.wg.Done()
  163. log.Printf("[重连] 开始重连流程,当前重试计数: %d", currentRetry)
  164. for {
  165. select {
  166. case <-c.ctx.Done():
  167. log.Println("[重连] 上下文取消,停止重连")
  168. return
  169. default:
  170. c.mu.Lock()
  171. if c.retryCount >= maxRetryCount {
  172. log.Printf("[重连] 达到最大重试次数(%d),停止重连", maxRetryCount)
  173. c.mu.Unlock()
  174. return
  175. }
  176. c.mu.Unlock()
  177. log.Printf("[重连] 尝试第%d次连接...", currentRetry+1)
  178. if err := c.connect(); err != nil {
  179. log.Printf("[重连][用户:%s] 连接失败: %v", c.userID, err)
  180. c.mu.Lock()
  181. c.retryCount++
  182. currentRetry = c.retryCount
  183. c.mu.Unlock()
  184. // 指数退避算法
  185. backoff := initialReconnectInterval
  186. if currentRetry > 0 {
  187. backoff = time.Duration(math.Min(
  188. float64(initialReconnectInterval)*math.Pow(1.5, float64(currentRetry)),
  189. float64(maxReconnectInterval),
  190. ))
  191. }
  192. select {
  193. case <-time.After(backoff):
  194. continue
  195. case <-c.ctx.Done():
  196. log.Printf("[重连][用户:%s] 等待期间上下文取消", c.userID)
  197. return
  198. }
  199. } else {
  200. log.Printf("[重连][用户:%s] 连接成功!", c.userID)
  201. go c.establishStream()
  202. return
  203. }
  204. }
  205. }
  206. }
  207. // 建立流
  208. func (c *ChatClient) establishStream() {
  209. c.wg.Add(1)
  210. defer c.wg.Done()
  211. for {
  212. select {
  213. case <-c.ctx.Done():
  214. return
  215. default:
  216. c.mu.RLock()
  217. connected := c.isConnected
  218. client := c.client
  219. c.mu.RUnlock()
  220. if !connected {
  221. time.Sleep(1 * time.Second)
  222. continue
  223. }
  224. c.streamMutex.Lock()
  225. stream, err := client.JoinChat(c.ctx, &JoinRequest{UserId: c.userID})
  226. if err != nil {
  227. c.streamMutex.Unlock()
  228. log.Printf("[流] 建立流失败: %v", err)
  229. c.disconnect()
  230. go c.reconnect()
  231. return
  232. }
  233. c.stream = stream
  234. c.streamMutex.Unlock()
  235. if err := c.receiveMessages(stream); err != nil {
  236. log.Printf("[流] 接收消息错误: %v", err)
  237. c.disconnect()
  238. go c.reconnect()
  239. return
  240. }
  241. }
  242. }
  243. }
  244. // 接收消息
  245. func (c *ChatClient) receiveMessages(stream ChatService_JoinChatClient) error {
  246. for {
  247. msg, err := stream.Recv()
  248. if err != nil {
  249. if status.Code(err) == codes.Canceled {
  250. return nil // 正常关闭
  251. }
  252. return fmt.Errorf("接收消息错误: %v", err)
  253. }
  254. log.Printf("[接收] 收到消息: %+v", msg)
  255. if msg.UserId == "系统" {
  256. switch msg.Action {
  257. case "sendTalk":
  258. go SendTalk(msg.Text)
  259. case "getContacts":
  260. go GetContacts()
  261. case "reject":
  262. go Reject(msg.Text)
  263. default:
  264. log.Printf("[系统通知]: %s", msg.Text)
  265. }
  266. if msg.Text == "欢迎加入聊天室" {
  267. go GetContacts()
  268. }
  269. }
  270. }
  271. }
  272. // 发送消息
  273. func (c *ChatClient) SendMessage(text, action string) error {
  274. c.mu.RLock()
  275. defer c.mu.RUnlock()
  276. if !c.isReady() {
  277. return fmt.Errorf("未连接服务器")
  278. }
  279. msg := &Message{
  280. UserId: c.userID,
  281. Text: text,
  282. Action: action,
  283. }
  284. log.Printf("[发送] 发送消息: %+v", msg)
  285. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  286. defer cancel()
  287. _, err := c.client.SendMessage(ctx, msg)
  288. if err != nil {
  289. log.Printf("[发送] 发送失败: %v", err)
  290. c.disconnect()
  291. go c.reconnect()
  292. return err
  293. }
  294. return nil
  295. }
  296. // 启动客户端
  297. func ConnectGRPC(userId, address string) {
  298. log.Println("[主程序] 启动GRPC连接")
  299. client = NewChatClient(userId, address)
  300. defer client.Shutdown()
  301. if err := client.connect(); err != nil {
  302. log.Printf("[主程序] 初始连接失败: %v", err)
  303. go client.reconnect()
  304. } else {
  305. go client.establishStream()
  306. }
  307. // 保持主线程运行
  308. quit := make(chan os.Signal, 1)
  309. signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
  310. <-quit
  311. }
  312. func (c *ChatClient) isReady() bool {
  313. c.mu.RLock()
  314. defer c.mu.RUnlock()
  315. return c.isConnected && c.conn != nil && c.conn.GetState() == connectivity.Ready
  316. }
  317. // startHealthCheck 启动健康检查
  318. func (c *ChatClient) startHealthCheck() {
  319. if c.healthCheckTicker != nil {
  320. c.healthCheckTicker.Stop()
  321. }
  322. c.healthCheckTicker = time.NewTicker(healthCheckInterval)
  323. c.wg.Add(1)
  324. go func() {
  325. defer c.wg.Done()
  326. defer c.healthCheckTicker.Stop()
  327. for {
  328. select {
  329. case <-c.healthCheckTicker.C:
  330. if !c.isReady() {
  331. log.Printf("[健康检查][用户:%s] 连接不可用,触发重连", c.userID)
  332. go c.reconnect()
  333. continue
  334. }
  335. // 执行Ping检查
  336. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  337. _, err := c.client.Ping(ctx, &PingRequest{})
  338. cancel()
  339. if err != nil {
  340. log.Printf("[健康检查][用户:%s] Ping失败: %v", c.userID, err)
  341. c.disconnect()
  342. go c.reconnect()
  343. } else {
  344. c.mu.Lock()
  345. c.lastPingTime = time.Now()
  346. c.mu.Unlock()
  347. }
  348. case <-c.ctx.Done():
  349. log.Printf("[健康检查][用户:%s] 停止健康检查", c.userID)
  350. return
  351. }
  352. }
  353. }()
  354. }
  355. // 修改健康检查实现
  356. func (c *ChatClient) checkHealth() error {
  357. ctx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
  358. defer cancel()
  359. // 优先使用JoinChat作为健康检查
  360. _, err := c.client.JoinChat(ctx, &JoinRequest{UserId: c.userID})
  361. if err != nil {
  362. st, ok := status.FromError(err)
  363. if ok && st.Code() == codes.Unimplemented {
  364. // 如果JoinChat未实现,尝试其他方法
  365. return c.fallbackHealthCheck()
  366. }
  367. return err
  368. }
  369. return nil
  370. }
  371. func (c *ChatClient) fallbackHealthCheck() error {
  372. // 实现其他健康检查方式
  373. return nil
  374. }
  375. func (c *ChatClient) Shutdown() {
  376. // 1. 取消上下文
  377. c.cancel()
  378. // 2. 断开连接
  379. c.disconnect()
  380. // 3. 等待所有goroutine结束
  381. c.wg.Wait()
  382. }
  383. func (c *ChatClient) log() *log.Logger {
  384. return log.New(os.Stdout, fmt.Sprintf("[用户:%s] ", c.userID), log.LstdFlags)
  385. }