123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- package service
- import (
- "context"
- "fmt"
- "github.com/gogf/gf/v2/util/gconv"
- "log"
- . "rpc/chat"
- "rpc/config"
- "sync"
- "time"
- )
- var Chatserver *ChatServer
- func init() {
- Chatserver = NewChatServer()
- }
- type ChatServer struct {
- UnimplementedChatServiceServer
- clients map[string]chan *Message
- adminMsg chan *Message
- mu sync.RWMutex
- shutdownChan chan struct{} // 关闭信号通道
- }
- func NewChatServer() *ChatServer {
- return &ChatServer{
- clients: make(map[string]chan *Message),
- adminMsg: make(chan *Message, 100),
- shutdownChan: make(chan struct{}),
- }
- }
- // 建立连接
- func (s *ChatServer) JoinChat(req *JoinRequest, stream ChatService_JoinChatServer) error {
- // 创建新通道
- msgChan := make(chan *Message, 100)
- // 注册客户端
- s.mu.Lock()
- if _, exists := s.clients[req.UserId]; exists {
- s.mu.Unlock()
- return fmt.Errorf("用户 %s 已连接", req.UserId)
- }
- s.clients[req.UserId] = msgChan
- s.mu.Unlock()
- // 发送欢迎消息
- welcomeMsg := &Message{
- UserId: "系统",
- Text: "欢迎加入聊天室",
- Timestamp: time.Now().Unix(),
- }
- if err := stream.Send(welcomeMsg); err != nil {
- s.removeClient(req.UserId)
- return err
- }
- // 清理处理
- defer s.removeClient(req.UserId)
- // 消息循环
- for {
- select {
- case msg := <-msgChan:
- if err := s.sendWithTimeout(stream, msg, 5*time.Second); err != nil {
- return err
- }
- case adminMsg := <-s.adminMsg:
- if err := s.sendWithTimeout(stream, adminMsg, 5*time.Second); err != nil {
- return err
- }
- case <-stream.Context().Done():
- return nil
- case <-s.shutdownChan:
- return nil
- }
- }
- }
- // 接收消息处理
- func (s *ChatServer) SendMessage(ctx context.Context, msg *Message) (*MessageAck, error) {
- msg.Timestamp = time.Now().Unix()
- log.Printf("收到来自 %s 的 %s 消息: %s\n", msg.UserId, msg.Action, msg.Text)
- // 先处理业务逻辑
- switch msg.Action {
- case "getContacts":
- log.Printf("接收%s通讯录信息\n", msg.UserId)
- go SynchronousContacts(msg.UserId, msg.Text)
- case "chatHistory":
- go AddChatRecord(msg.UserId, msg.Text) // 异步处理
- case "sendTalk":
- //操作
- go Task() // 异步处理
- case "sendTalkReceipt":
- go SendTalkReceipt(msg.Text) // 异步处理
- }
- // 发送消息(加锁范围最小化)
- s.mu.RLock()
- defer s.mu.RUnlock()
- for userId, ch := range s.clients {
- select {
- case ch <- msg:
- default:
- log.Printf("客户端 %s 的消息通道已满\n", userId)
- }
- }
- return &MessageAck{Success: true}, nil
- }
- // SendAdminMessage 向指定用户发送系统消息
- func (s *ChatServer) SendAdminMessage(userId string, text string, action string) error {
- s.mu.Lock()
- defer s.mu.Unlock()
- // 检查目标用户是否存在
- msgChan, exists := s.clients[userId]
- if !exists {
- return fmt.Errorf("用户 %s 不存在或已离线", userId)
- }
- // 构造系统消息
- msg := &Message{
- UserId: "系统",
- Text: text,
- Timestamp: time.Now().Unix(),
- Action: action,
- }
- // 发送消息
- select {
- case msgChan <- msg:
- log.Printf("已向用户 %s 发送系统消息: %s\n", userId, text)
- return nil
- default:
- return fmt.Errorf("用户 %s 的消息通道已满", userId)
- }
- }
- func (s *ChatServer) StartTimedMessages(ctx context.Context, interval time.Duration, action string) {
- // 立即执行一次任务
- s.executeTimedAction(ctx, action)
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- s.executeTimedAction(ctx, action)
- case <-ctx.Done():
- log.Printf("定时任务[%s]已停止", action)
- return
- case <-s.shutdownChan:
- log.Printf("服务关闭,停止定时任务[%s]", action)
- return
- }
- }
- }
- func (s *ChatServer) executeTimedAction(ctx context.Context, action string) {
- defer func() {
- if r := recover(); r != nil {
- log.Printf("定时任务[%s]执行出错: %v\n", action, r)
- }
- }()
- startTime := time.Now()
- log.Printf("开始执行定时任务[%s]\n", action)
- message := fmt.Sprintf("系统定时消息: 当前时间 %v", startTime.Format("2006-01-02 15:04:05"))
- // 使用更安全的方式获取客户端列表
- clients := s.getClientsSnapshot()
- if len(clients) > 0 {
- log.Printf("当前在线客户端数: %d\n", len(clients))
- }
- // 根据action执行不同操作
- switch action {
- case "getContacts":
- s.BroadcastAdminMessage(message, "getContacts")
- case "sendTalk":
- s.executeTask(ctx)
- case "heartbeat":
- s.BroadcastAdminMessage(message, "heartbeat")
- default:
- log.Printf("未知的定时任务类型: %s\n", action)
- }
- log.Printf("完成定时任务[%s], 耗时: %v \n", action, time.Since(startTime))
- }
- func (s *ChatServer) getClientsSnapshot() []string {
- s.mu.RLock()
- defer s.mu.RUnlock()
- clients := make([]string, 0, len(s.clients))
- for userId := range s.clients {
- clients = append(clients, userId)
- }
- return clients
- }
- func (s *ChatServer) executeTask(ctx context.Context) {
- // 为Task操作添加超时控制
- taskCtx, cancel := context.WithTimeout(ctx, 60*time.Second)
- defer cancel()
- done := make(chan struct{})
- go func() {
- defer close(done)
- Task() // 假设Task是您定义的任务函数
- }()
- select {
- case <-done:
- log.Println("Task执行完成")
- case <-taskCtx.Done():
- log.Println("Task执行超时")
- }
- }
- // BroadcastAdminMessage 向所有客户端广播系统消息
- func (s *ChatServer) BroadcastAdminMessage(text string, action string) {
- s.mu.Lock()
- defer s.mu.Unlock()
- msg := &Message{
- UserId: "系统",
- Text: text,
- Timestamp: time.Now().Unix(),
- Action: action,
- }
- for userId, ch := range s.clients {
- select {
- case ch <- msg:
- log.Printf("已广播系统消息到用户 %s: %s\n", userId, text)
- default:
- log.Printf("用户 %s 的消息通道已满,无法广播\n", userId)
- }
- }
- }
- // SpecifyAdminMessage 向制定客户端广播系统消息
- func (s *ChatServer) SpecifyAdminMessage(taskId int64, userMap map[string]interface{}, contentData *[]map[string]interface{}, action, batchCode string) error {
- userId := gconv.String(userMap["userId"])
- isRefuse := gconv.Int64(userMap["isRefuse"])
- if isRefuse == 1 {
- //拒绝用户
- config.WxRobot.Insert("send_record", map[string]interface{}{
- "task_id": taskId,
- "base_user_id": gconv.String(userMap["baseUserId"]),
- "send_status": 1,
- "create_time": time.Now().Format(time.DateTime),
- "batch_code": batchCode,
- "remark": "用户拒绝",
- })
- return nil
- }
- s.mu.Lock()
- ch, exists := s.clients[userId] // 直接获取目标用户的 channel
- s.mu.Unlock()
- if !exists {
- config.WxRobot.Insert("send_record", map[string]interface{}{
- "task_id": taskId,
- "base_user_id": gconv.String(userMap["baseUserId"]),
- "send_status": 1,
- "create_time": time.Now().Format(time.DateTime),
- "batch_code": batchCode,
- "remark": fmt.Sprintf("%s客户端关闭", userId),
- })
- return fmt.Errorf("用户 %s 不存在或未连接", userId)
- } else {
- config.WxRobot.Insert("send_record", map[string]interface{}{
- "task_id": taskId,
- "base_user_id": gconv.String(userMap["baseUserId"]),
- "send_status": 1,
- "create_time": time.Now().Format(time.DateTime),
- "batch_code": batchCode,
- })
- }
- text := gconv.String(map[string]interface{}{
- "user": userMap,
- "content": contentData,
- "taskId": taskId,
- "batchCode": batchCode,
- "replyLanguage": config.DbConf.ReplyLanguage,
- })
- msg := &Message{
- UserId: "系统",
- Text: text,
- Timestamp: time.Now().Unix(),
- Action: action, // 例如:"alert"/"notification"/"kick"
- }
- select {
- case ch <- msg:
- log.Printf("系统消息已发送到用户 %s: %s (Action: %s)\n", userId, text, action)
- return nil
- default:
- log.Printf("用户 %s 的消息通道已满,丢弃消息\n", userId)
- return fmt.Errorf("用户 %s 的消息通道阻塞", userId)
- }
- }
- // SpecifysystemMessage 向制定客户端广播系统消息(拒绝也发,不保存发送记录)
- func (s *ChatServer) SpecifysystemMessage(userId, wxId string, contentData map[string]interface{}, action string) error {
- // 1. 加锁并获取用户channel
- s.mu.Lock()
- ch, exists := s.clients[userId]
- if !exists {
- s.mu.Unlock()
- log.Printf("用户 %s 不存在或已离线 (wxId: %s)\n", userId, wxId)
- return fmt.Errorf("user %s not found", userId)
- }
- // 2. 准备消息数据(仍在锁保护下)
- msg := &Message{
- UserId: "系统",
- Text: buildMessageText(contentData, wxId),
- Timestamp: time.Now().Unix(),
- Action: action,
- }
- // 3. 复制channel引用后立即释放锁
- channel := ch
- s.mu.Unlock()
- // 4. 尝试发送消息
- return trySendMessage(channel, msg, userId, action)
- }
- // 辅助函数:构建消息文本
- func buildMessageText(contentData map[string]interface{}, wxId string) string {
- return gconv.String(map[string]interface{}{
- "content": contentData,
- "wxId": wxId,
- })
- }
- // 辅助函数:尝试发送消息
- func trySendMessage(ch chan<- *Message, msg *Message, userId, action string) error {
- select {
- case ch <- msg:
- log.Printf("系统消息发送成功 | 用户: %s | 动作: %s\n", userId, action)
- return nil
- default:
- log.Printf("消息通道已满 | 用户: %s | 动作: %s\n", userId, action)
- return fmt.Errorf("message queue full for user %s", userId)
- }
- }
- func (s *ChatServer) Ping(ctx context.Context, req *PingRequest) (*PingResponse, error) {
- return &PingResponse{Status: "OK"}, nil // 确认返回值类型匹配
- }
- // Shutdown 优雅关闭服务
- func (s *ChatServer) Shutdown() {
- close(s.shutdownChan)
- // 关闭所有客户端连接
- s.mu.Lock()
- defer s.mu.Unlock()
- for userId, ch := range s.clients {
- close(ch)
- delete(s.clients, userId)
- }
- }
- // removeClient 安全地移除客户端连接
- func (s *ChatServer) removeClient(userId string) {
- s.mu.Lock()
- defer s.mu.Unlock()
- if ch, exists := s.clients[userId]; exists {
- // 关闭通道前先检查是否已关闭
- select {
- case _, ok := <-ch:
- if ok {
- close(ch) // 只有通道未关闭时才关闭它
- }
- default:
- close(ch)
- }
- delete(s.clients, userId)
- log.Printf("客户端 %s 已断开连接", userId)
- }
- }
- // sendWithTimeout 带超时的消息发送
- func (s *ChatServer) sendWithTimeout(stream ChatService_JoinChatServer, msg *Message, timeout time.Duration) error {
- ctx, cancel := context.WithTimeout(stream.Context(), timeout)
- defer cancel()
- done := make(chan error, 1)
- go func() {
- done <- stream.Send(msg)
- }()
- select {
- case err := <-done:
- return err
- case <-ctx.Done():
- // 超时后检查原始上下文是否已取消
- select {
- case <-stream.Context().Done():
- return stream.Context().Err()
- default:
- return fmt.Errorf("消息发送超时")
- }
- }
- }
|