package model import ( "aiChat/utility/fsw" . "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/date" "app.yhyue.com/moapp/jybase/encrypt" "bufio" "context" "encoding/json" "fmt" "github.com/gogf/gf/v2/encoding/gjson" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/ghttp" "github.com/gogf/gf/v2/util/gconv" "io" "strings" "time" ) type WsChat struct { Ctx context.Context } func NewMessage(ctx context.Context) *WsChat { return &WsChat{ Ctx: ctx, } } // Handle 处理消息 func (m *WsChat) Handle(ws *ghttp.WebSocket, msg []byte) { defer Catch() jSession := SessionCtx.Get(m.Ctx).JSession if jSession.PositionId == 0 { _ = ws.WriteJSON(g.Map{ "error_code": -1, "error_msg": "请登录", }) return } req, from := &QuestionReq{}, 0 if err := gjson.Unmarshal(msg, req); err != nil { g.Log().Errorf(m.Ctx, "%d 接收消息Unmarshal出错:%v", jSession.PositionId, err) return } questionId := ChatHistory.Save(m.Ctx, &ChatRecord{ Content: req.Prompt, Type: 1, Refer: req.Href, PersonId: jSession.PositionId, CreateTime: time.Now().Format(date.Date_Full_Layout), }) content, res, replyId, errMsg := func() (string, io.ReadCloser, int64, error) { var ( err error res io.ReadCloser reply string ) errReply := func() string { // 校验是否在黑名单,黑名单不返回内容 if UserBlackList.CheckBlackList(m.Ctx, jSession.PositionId) { return g.Cfg().MustGet(m.Ctx, "limit.blackMsg").String() } // 校验问答频率 if ChatLimit.GetBucket(m.Ctx, jSession.PositionId).TakeAvailable(1) == 0 { return g.Cfg().MustGet(m.Ctx, "limit.exceedMsg").String() } // 问题敏感词过滤 if fsw.Match(req.Prompt) { return g.Cfg().MustGet(m.Ctx, "limit.fswMsg").String() } return "" }() if errReply != "" { reply, from = errReply, -1 } else { reply, res, from, err = Question.DetailQuestion(m.Ctx, req) if err != nil { g.Log().Error(m.Ctx, "问答异常", err) reply = g.Cfg().MustGet(m.Ctx, "limit.errMsg").String() errReply = g.Cfg().MustGet(m.Ctx, "limit.errMsg").String() } } if from == Answer_ChatGPT { return reply, res, 0, nil } if reply == "" { reply = g.Cfg().MustGet(m.Ctx, "limit.emptyMsg").String() errReply = g.Cfg().MustGet(m.Ctx, "limit.emptyMsg").String() } replyId := ChatHistory.Save(m.Ctx, &ChatRecord{ Content: reply, Type: 2, Actions: gconv.Int(If(errReply == "", 1, 0)), QuestionId: questionId, PersonId: jSession.PositionId, Item: from, CreateTime: time.Now().Format(date.Date_Full_Layout), }) if replyId <= 0 { g.Log().Error(m.Ctx, "问答存储存储异常") } if errReply != "" { return reply, nil, replyId, fmt.Errorf(errReply) } return reply, nil, replyId, nil }() if res != nil { defer res.Close() } if from != Answer_ChatGPT { if errMsg != nil { _ = ws.WriteJSON(g.Map{"error_code": -1, "error_msg": errMsg.Error(), "data": nil}) } else { _ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"id": encrypt.SE.Encode2Hex(fmt.Sprintf("%d", replyId)), "reply": content, "isEnd": true}}) } } else if res != nil { buf, lastData := bufio.NewReader(res), &BufRes{} isEmpty := true for { line, _, err := buf.ReadLine() if err == nil { break } if _, data := parseEventStream(line); data != nil && strings.TrimSpace(data.Response) != "" { data.Response = fsw.Repl(data.Response) lastData, isEmpty = data, false _ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"reply": lastData.Response, "isEnd": false}}) } } ChatGptPool.Add() //放回链接池 finalReply := If(isEmpty, g.Cfg().MustGet(m.Ctx, "limit.emptyMsg").String(), lastData.Response).(string) replyId := ChatHistory.Save(m.Ctx, &ChatRecord{ Content: finalReply, Type: 2, Actions: gconv.Int(If(isEmpty, 0, 1)), QuestionId: questionId, PersonId: jSession.PositionId, Item: Answer_ChatGPT, CreateTime: time.Now().Format(date.Date_Full_Layout), }) if !isEmpty { _ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"id": encrypt.SE.Encode2Hex(fmt.Sprintf("%d", replyId)), "reply": finalReply, "isEnd": true}}) } else { _ = ws.WriteJSON(g.Map{"error_code": -1, "error_msg": finalReply}) } } } func parseEventStream(line []byte) (event string, date *BufRes) { // 如果行以 "event:" 开头,表示这是一个事件的标识符 if len(line) > 6 && string(line[:6]) == "event:" { event = string(line[6 : len(line)-1]) return event, nil } // 如果行以 "data:" 开头,表示这是事件的数据部分 if len(line) > 5 && string(line[:5]) == "data:" { date = &BufRes{} if err := json.Unmarshal(line[5:len(line)], date); err == nil { return } return event, nil } return "", nil }