123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- package model
- import (
- "aiChat/utility/fsw"
- . "app.yhyue.com/moapp/jybase/common"
- "app.yhyue.com/moapp/jybase/date"
- "app.yhyue.com/moapp/jybase/encrypt"
- "bufio"
- "context"
- "encoding/json"
- "fmt"
- "github.com/gogf/gf/v2/encoding/gjson"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/net/ghttp"
- "github.com/gogf/gf/v2/util/gconv"
- "io"
- "strings"
- "time"
- )
- type WsChat struct {
- Ctx context.Context
- }
- func NewMessage(ctx context.Context) *WsChat {
- return &WsChat{
- Ctx: ctx,
- }
- }
- // Handle 处理消息
- func (m *WsChat) Handle(ws *ghttp.WebSocket, msg []byte) {
- defer Catch()
- jSession := SessionCtx.Get(m.Ctx).JSession
- if jSession.PositionId == 0 {
- _ = ws.WriteJSON(g.Map{
- "error_code": -1,
- "error_msg": "请登录",
- })
- return
- }
- req, from := &QuestionReq{}, 0
- if err := gjson.Unmarshal(msg, req); err != nil {
- g.Log().Errorf(m.Ctx, "%d 接收消息Unmarshal出错:%v", jSession.PositionId, err)
- return
- }
- questionId := ChatHistory.Save(m.Ctx, &ChatRecord{
- Content: req.Prompt,
- Type: 1,
- Refer: req.Href,
- PersonId: jSession.PositionId,
- CreateTime: time.Now().Format(date.Date_Full_Layout),
- })
- content, res, replyId, errMsg := func() (string, io.ReadCloser, int64, error) {
- var (
- err error
- res io.ReadCloser
- reply string
- )
- errReply := func() string {
- // 校验是否在黑名单,黑名单不返回内容
- if UserBlackList.CheckBlackList(m.Ctx, jSession.PositionId) {
- return g.Cfg().MustGet(m.Ctx, "limit.blackMsg").String()
- }
- // 校验问答频率
- if ChatLimit.GetBucket(m.Ctx, jSession.PositionId).TakeAvailable(1) == 0 {
- return g.Cfg().MustGet(m.Ctx, "limit.exceedMsg").String()
- }
- // 问题敏感词过滤
- if fsw.Match(req.Prompt) {
- return g.Cfg().MustGet(m.Ctx, "limit.fswMsg").String()
- }
- return ""
- }()
- if errReply != "" {
- reply, from = errReply, -1
- } else {
- reply, res, from, err = Question.DetailQuestion(m.Ctx, req)
- if err != nil {
- g.Log().Error(m.Ctx, "问答异常", err)
- reply = g.Cfg().MustGet(m.Ctx, "limit.errMsg").String()
- errReply = g.Cfg().MustGet(m.Ctx, "limit.errMsg").String()
- }
- }
- if from == Answer_ChatGPT {
- return reply, res, 0, nil
- }
- if reply == "" {
- reply = g.Cfg().MustGet(m.Ctx, "limit.emptyMsg").String()
- errReply = g.Cfg().MustGet(m.Ctx, "limit.emptyMsg").String()
- }
- replyId := ChatHistory.Save(m.Ctx, &ChatRecord{
- Content: reply,
- Type: 2,
- Actions: gconv.Int(If(errReply == "", 1, 0)),
- QuestionId: questionId,
- PersonId: jSession.PositionId,
- Item: from,
- CreateTime: time.Now().Format(date.Date_Full_Layout),
- })
- if replyId <= 0 {
- g.Log().Error(m.Ctx, "问答存储存储异常")
- }
- if errReply != "" {
- return reply, nil, replyId, fmt.Errorf(errReply)
- }
- return reply, nil, replyId, nil
- }()
- if res != nil {
- defer res.Close()
- }
- if from != Answer_ChatGPT {
- if errMsg != nil {
- _ = ws.WriteJSON(g.Map{"error_code": -1, "error_msg": errMsg.Error(), "data": nil})
- } else {
- _ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"id": encrypt.SE.Encode2Hex(fmt.Sprintf("%d", replyId)), "reply": content, "isEnd": true}})
- }
- } else if res != nil {
- buf, lastData := bufio.NewReader(res), &BufRes{}
- isEmpty := true
- for {
- line, _, err := buf.ReadLine()
- if err == nil {
- break
- }
- if _, data := parseEventStream(line); data != nil && strings.TrimSpace(data.Response) != "" {
- data.Response = fsw.Repl(data.Response)
- lastData, isEmpty = data, false
- _ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"reply": lastData.Response, "isEnd": false}})
- }
- }
- ChatGptPool.Add() //放回链接池
- finalReply := If(isEmpty, g.Cfg().MustGet(m.Ctx, "limit.emptyMsg").String(), lastData.Response).(string)
- replyId := ChatHistory.Save(m.Ctx, &ChatRecord{
- Content: finalReply,
- Type: 2,
- Actions: gconv.Int(If(isEmpty, 0, 1)),
- QuestionId: questionId,
- PersonId: jSession.PositionId,
- Item: Answer_ChatGPT,
- CreateTime: time.Now().Format(date.Date_Full_Layout),
- })
- if !isEmpty {
- _ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"id": encrypt.SE.Encode2Hex(fmt.Sprintf("%d", replyId)), "reply": finalReply, "isEnd": true}})
- } else {
- _ = ws.WriteJSON(g.Map{"error_code": -1, "error_msg": finalReply})
- }
- }
- }
- func parseEventStream(line []byte) (event string, date *BufRes) {
- // 如果行以 "event:" 开头,表示这是一个事件的标识符
- if len(line) > 6 && string(line[:6]) == "event:" {
- event = string(line[6 : len(line)-1])
- return event, nil
- }
- // 如果行以 "data:" 开头,表示这是事件的数据部分
- if len(line) > 5 && string(line[:5]) == "data:" {
- date = &BufRes{}
- if err := json.Unmarshal(line[5:len(line)], date); err == nil {
- return
- }
- return event, nil
- }
- return "", nil
- }
|