chatServer.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/util/gconv"
  6. "log"
  7. . "rpc/chat"
  8. "rpc/config"
  9. "sync"
  10. "time"
  11. )
  12. var Chatserver *ChatServer
  13. func init() {
  14. Chatserver = NewChatServer()
  15. }
  16. type ChatServer struct {
  17. UnimplementedChatServiceServer
  18. clients map[string]chan *Message
  19. adminMsg chan *Message
  20. mu sync.RWMutex
  21. shutdownChan chan struct{} // 关闭信号通道
  22. }
  23. func NewChatServer() *ChatServer {
  24. return &ChatServer{
  25. clients: make(map[string]chan *Message),
  26. adminMsg: make(chan *Message, 100),
  27. shutdownChan: make(chan struct{}),
  28. }
  29. }
  30. // 建立连接
  31. func (s *ChatServer) JoinChat(req *JoinRequest, stream ChatService_JoinChatServer) error {
  32. // 创建新通道
  33. msgChan := make(chan *Message, 100)
  34. // 注册客户端
  35. s.mu.Lock()
  36. if _, exists := s.clients[req.UserId]; exists {
  37. s.mu.Unlock()
  38. return fmt.Errorf("用户 %s 已连接", req.UserId)
  39. }
  40. s.clients[req.UserId] = msgChan
  41. s.mu.Unlock()
  42. // 发送欢迎消息
  43. welcomeMsg := &Message{
  44. UserId: "系统",
  45. Text: "欢迎加入聊天室",
  46. Timestamp: time.Now().Unix(),
  47. }
  48. if err := stream.Send(welcomeMsg); err != nil {
  49. s.removeClient(req.UserId)
  50. return err
  51. }
  52. // 清理处理
  53. defer s.removeClient(req.UserId)
  54. // 消息循环
  55. for {
  56. select {
  57. case msg := <-msgChan:
  58. if err := s.sendWithTimeout(stream, msg, 5*time.Second); err != nil {
  59. return err
  60. }
  61. case adminMsg := <-s.adminMsg:
  62. if err := s.sendWithTimeout(stream, adminMsg, 5*time.Second); err != nil {
  63. return err
  64. }
  65. case <-stream.Context().Done():
  66. return nil
  67. case <-s.shutdownChan:
  68. return nil
  69. }
  70. }
  71. }
  72. // 接收消息处理
  73. func (s *ChatServer) SendMessage(ctx context.Context, msg *Message) (*MessageAck, error) {
  74. msg.Timestamp = time.Now().Unix()
  75. log.Printf("收到来自 %s 的 %s 消息: %s\n", msg.UserId, msg.Action, msg.Text)
  76. // 先处理业务逻辑
  77. switch msg.Action {
  78. case "getContacts":
  79. log.Printf("接收%s通讯录信息\n", msg.UserId)
  80. go SynchronousContacts(msg.UserId, msg.Text)
  81. case "chatHistory":
  82. go AddChatRecord(msg.UserId, msg.Text) // 异步处理
  83. case "sendTalk":
  84. //操作
  85. go Task() // 异步处理
  86. case "sendTalkReceipt":
  87. go SendTalkReceipt(msg.Text) // 异步处理
  88. }
  89. // 发送消息(加锁范围最小化)
  90. s.mu.RLock()
  91. defer s.mu.RUnlock()
  92. for userId, ch := range s.clients {
  93. select {
  94. case ch <- msg:
  95. default:
  96. log.Printf("客户端 %s 的消息通道已满\n", userId)
  97. }
  98. }
  99. return &MessageAck{Success: true}, nil
  100. }
  101. // SendAdminMessage 向指定用户发送系统消息
  102. func (s *ChatServer) SendAdminMessage(userId string, text string, action string) error {
  103. s.mu.Lock()
  104. defer s.mu.Unlock()
  105. // 检查目标用户是否存在
  106. msgChan, exists := s.clients[userId]
  107. if !exists {
  108. return fmt.Errorf("用户 %s 不存在或已离线", userId)
  109. }
  110. // 构造系统消息
  111. msg := &Message{
  112. UserId: "系统",
  113. Text: text,
  114. Timestamp: time.Now().Unix(),
  115. Action: action,
  116. }
  117. // 发送消息
  118. select {
  119. case msgChan <- msg:
  120. log.Printf("已向用户 %s 发送系统消息: %s\n", userId, text)
  121. return nil
  122. default:
  123. return fmt.Errorf("用户 %s 的消息通道已满", userId)
  124. }
  125. }
  126. func (s *ChatServer) StartTimedMessages(ctx context.Context, interval time.Duration, action string) {
  127. // 立即执行一次任务
  128. s.executeTimedAction(ctx, action)
  129. ticker := time.NewTicker(interval)
  130. defer ticker.Stop()
  131. for {
  132. select {
  133. case <-ticker.C:
  134. s.executeTimedAction(ctx, action)
  135. case <-ctx.Done():
  136. log.Printf("定时任务[%s]已停止", action)
  137. return
  138. case <-s.shutdownChan:
  139. log.Printf("服务关闭,停止定时任务[%s]", action)
  140. return
  141. }
  142. }
  143. }
  144. func (s *ChatServer) executeTimedAction(ctx context.Context, action string) {
  145. defer func() {
  146. if r := recover(); r != nil {
  147. log.Printf("定时任务[%s]执行出错: %v\n", action, r)
  148. }
  149. }()
  150. startTime := time.Now()
  151. log.Printf("开始执行定时任务[%s]\n", action)
  152. message := fmt.Sprintf("系统定时消息: 当前时间 %v", startTime.Format("2006-01-02 15:04:05"))
  153. // 使用更安全的方式获取客户端列表
  154. clients := s.getClientsSnapshot()
  155. if len(clients) > 0 {
  156. log.Printf("当前在线客户端数: %d\n", len(clients))
  157. }
  158. // 根据action执行不同操作
  159. switch action {
  160. case "getContacts":
  161. s.BroadcastAdminMessage(message, "getContacts")
  162. case "sendTalk":
  163. s.executeTask(ctx)
  164. case "heartbeat":
  165. s.BroadcastAdminMessage(message, "heartbeat")
  166. default:
  167. log.Printf("未知的定时任务类型: %s\n", action)
  168. }
  169. log.Printf("完成定时任务[%s], 耗时: %v \n", action, time.Since(startTime))
  170. }
  171. func (s *ChatServer) getClientsSnapshot() []string {
  172. s.mu.RLock()
  173. defer s.mu.RUnlock()
  174. clients := make([]string, 0, len(s.clients))
  175. for userId := range s.clients {
  176. clients = append(clients, userId)
  177. }
  178. return clients
  179. }
  180. func (s *ChatServer) executeTask(ctx context.Context) {
  181. // 为Task操作添加超时控制
  182. taskCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
  183. defer cancel()
  184. done := make(chan struct{})
  185. go func() {
  186. defer close(done)
  187. Task() // 假设Task是您定义的任务函数
  188. }()
  189. select {
  190. case <-done:
  191. log.Println("Task执行完成")
  192. case <-taskCtx.Done():
  193. log.Println("Task执行超时")
  194. }
  195. }
  196. // BroadcastAdminMessage 向所有客户端广播系统消息
  197. func (s *ChatServer) BroadcastAdminMessage(text string, action string) {
  198. s.mu.Lock()
  199. defer s.mu.Unlock()
  200. msg := &Message{
  201. UserId: "系统",
  202. Text: text,
  203. Timestamp: time.Now().Unix(),
  204. Action: action,
  205. }
  206. for userId, ch := range s.clients {
  207. select {
  208. case ch <- msg:
  209. log.Printf("已广播系统消息到用户 %s: %s\n", userId, text)
  210. default:
  211. log.Printf("用户 %s 的消息通道已满,无法广播\n", userId)
  212. }
  213. }
  214. }
  215. // SpecifyAdminMessage 向制定客户端广播系统消息
  216. func (s *ChatServer) SpecifyAdminMessage(taskId int64, userMap map[string]interface{}, contentData *[]map[string]interface{}, action, batchCode string) error {
  217. userId := gconv.String(userMap["userId"])
  218. isRefuse := gconv.Int64(userMap["isRefuse"])
  219. if isRefuse == 1 {
  220. //拒绝用户
  221. config.WxRobot.Insert("send_record", map[string]interface{}{
  222. "task_id": taskId,
  223. "base_user_id": gconv.String(userMap["baseUserId"]),
  224. "send_status": 1,
  225. "create_time": time.Now().Format(time.DateTime),
  226. "batch_code": batchCode,
  227. "remark": "用户拒绝",
  228. })
  229. return nil
  230. }
  231. s.mu.Lock()
  232. ch, exists := s.clients[userId] // 直接获取目标用户的 channel
  233. s.mu.Unlock()
  234. if !exists {
  235. config.WxRobot.Insert("send_record", map[string]interface{}{
  236. "task_id": taskId,
  237. "base_user_id": gconv.String(userMap["baseUserId"]),
  238. "send_status": 1,
  239. "create_time": time.Now().Format(time.DateTime),
  240. "batch_code": batchCode,
  241. "remark": fmt.Sprintf("%s客户端关闭", userId),
  242. })
  243. return fmt.Errorf("用户 %s 不存在或未连接", userId)
  244. } else {
  245. config.WxRobot.Insert("send_record", map[string]interface{}{
  246. "task_id": taskId,
  247. "base_user_id": gconv.String(userMap["baseUserId"]),
  248. "send_status": 1,
  249. "create_time": time.Now().Format(time.DateTime),
  250. "batch_code": batchCode,
  251. })
  252. }
  253. text := gconv.String(map[string]interface{}{
  254. "user": userMap,
  255. "content": contentData,
  256. "taskId": taskId,
  257. "batchCode": batchCode,
  258. "replyLanguage": config.DbConf.ReplyLanguage,
  259. })
  260. msg := &Message{
  261. UserId: "系统",
  262. Text: text,
  263. Timestamp: time.Now().Unix(),
  264. Action: action, // 例如:"alert"/"notification"/"kick"
  265. }
  266. select {
  267. case ch <- msg:
  268. log.Printf("系统消息已发送到用户 %s: %s (Action: %s)\n", userId, text, action)
  269. return nil
  270. default:
  271. log.Printf("用户 %s 的消息通道已满,丢弃消息\n", userId)
  272. return fmt.Errorf("用户 %s 的消息通道阻塞", userId)
  273. }
  274. }
  275. // SpecifysystemMessage 向制定客户端广播系统消息(拒绝也发,不保存发送记录)
  276. func (s *ChatServer) SpecifysystemMessage(userId, wxId string, contentData map[string]interface{}, action string) error {
  277. // 1. 加锁并获取用户channel
  278. s.mu.Lock()
  279. ch, exists := s.clients[userId]
  280. if !exists {
  281. s.mu.Unlock()
  282. log.Printf("用户 %s 不存在或已离线 (wxId: %s)\n", userId, wxId)
  283. return fmt.Errorf("user %s not found", userId)
  284. }
  285. // 2. 准备消息数据(仍在锁保护下)
  286. msg := &Message{
  287. UserId: "系统",
  288. Text: buildMessageText(contentData, wxId),
  289. Timestamp: time.Now().Unix(),
  290. Action: action,
  291. }
  292. // 3. 复制channel引用后立即释放锁
  293. channel := ch
  294. s.mu.Unlock()
  295. // 4. 尝试发送消息
  296. return trySendMessage(channel, msg, userId, action)
  297. }
  298. // 辅助函数:构建消息文本
  299. func buildMessageText(contentData map[string]interface{}, wxId string) string {
  300. return gconv.String(map[string]interface{}{
  301. "content": contentData,
  302. "wxId": wxId,
  303. })
  304. }
  305. // 辅助函数:尝试发送消息
  306. func trySendMessage(ch chan<- *Message, msg *Message, userId, action string) error {
  307. select {
  308. case ch <- msg:
  309. log.Printf("系统消息发送成功 | 用户: %s | 动作: %s\n", userId, action)
  310. return nil
  311. default:
  312. log.Printf("消息通道已满 | 用户: %s | 动作: %s\n", userId, action)
  313. return fmt.Errorf("message queue full for user %s", userId)
  314. }
  315. }
  316. func (s *ChatServer) Ping(ctx context.Context, req *PingRequest) (*PingResponse, error) {
  317. return &PingResponse{Status: "OK"}, nil // 确认返回值类型匹配
  318. }
  319. // Shutdown 优雅关闭服务
  320. func (s *ChatServer) Shutdown() {
  321. close(s.shutdownChan)
  322. // 关闭所有客户端连接
  323. s.mu.Lock()
  324. defer s.mu.Unlock()
  325. for userId, ch := range s.clients {
  326. close(ch)
  327. delete(s.clients, userId)
  328. }
  329. }
  330. // removeClient 安全地移除客户端连接
  331. func (s *ChatServer) removeClient(userId string) {
  332. s.mu.Lock()
  333. defer s.mu.Unlock()
  334. if ch, exists := s.clients[userId]; exists {
  335. // 关闭通道前先检查是否已关闭
  336. select {
  337. case _, ok := <-ch:
  338. if ok {
  339. close(ch) // 只有通道未关闭时才关闭它
  340. }
  341. default:
  342. close(ch)
  343. }
  344. delete(s.clients, userId)
  345. log.Printf("客户端 %s 已断开连接", userId)
  346. }
  347. }
  348. // sendWithTimeout 带超时的消息发送
  349. func (s *ChatServer) sendWithTimeout(stream ChatService_JoinChatServer, msg *Message, timeout time.Duration) error {
  350. ctx, cancel := context.WithTimeout(stream.Context(), timeout)
  351. defer cancel()
  352. done := make(chan error, 1)
  353. go func() {
  354. done <- stream.Send(msg)
  355. }()
  356. select {
  357. case err := <-done:
  358. return err
  359. case <-ctx.Done():
  360. // 超时后检查原始上下文是否已取消
  361. select {
  362. case <-stream.Context().Done():
  363. return stream.Context().Err()
  364. default:
  365. return fmt.Errorf("消息发送超时")
  366. }
  367. }
  368. }