Browse Source

wip:结束表示修改

wangkaiyue 2 years ago
parent
commit
6469efab2e
3 changed files with 37 additions and 23 deletions
  1. 10 2
      internal/model/chatApi.go
  2. 2 2
      internal/model/question.go
  3. 25 19
      internal/model/ws.go

+ 10 - 2
internal/model/chatApi.go

@@ -7,6 +7,7 @@ import (
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/net/gclient"
 	"github.com/gogf/gf/v2/util/gconv"
+	"io"
 	"net/http"
 	"strings"
 )
@@ -51,6 +52,7 @@ func (c *cChatGpt) SimpleDo(ctx context.Context, qReq *QuestionReq) (res *Simple
 		return nil, err
 	}
 	res = &SimpleRes{}
+	defer gRes.Close()
 	err = gconv.Struct(gRes.ReadAll(), res)
 	//g.Dump("SimpleDo", gReq, res)
 	if err != nil {
@@ -59,7 +61,7 @@ func (c *cChatGpt) SimpleDo(ctx context.Context, qReq *QuestionReq) (res *Simple
 	return
 }
 
-func (c *cChatGpt) GPTDo(ctx context.Context, qReq *QuestionReq) (res *bufio.Reader, err error) {
+func (c *cChatGpt) GPTDo(ctx context.Context, qReq *QuestionReq) (res io.ReadCloser, err error) {
 	gReq := GPTReq{
 		BaseQuestion: qReq.BaseQuestion,
 		Identity:     g.Config().MustGet(ctx, "chat.api.identity", "剑鱼chat").String(),
@@ -68,10 +70,16 @@ func (c *cChatGpt) GPTDo(ctx context.Context, qReq *QuestionReq) (res *bufio.Rea
 		gReq.History = [][]string{}
 	}
 	req, err := http.NewRequest("POST", g.Config().MustGet(ctx, "chat.api.addr_answer", "").String(), strings.NewReader(gconv.String(gReq)))
+	if err != nil {
+		return nil, err
+	}
 	client := &http.Client{}
 	req.Header.Set("Accept", "text/event-stream")
 	resp, err := client.Do(req)
-	return bufio.NewReader(resp.Body), nil
+	if err != nil {
+		return nil, err
+	}
+	return resp.Body, nil
 }
 
 type BufRes struct {

+ 2 - 2
internal/model/question.go

@@ -3,12 +3,12 @@ package model
 import (
 	"aiChat/utility"
 	"aiChat/utility/fsw"
-	"bufio"
 	"context"
 	"encoding/json"
 	"fmt"
 	"github.com/gogf/gf/v2/frame/g"
 	"github.com/gogf/gf/v2/util/gconv"
+	"io"
 	"regexp"
 	"strings"
 )
@@ -143,7 +143,7 @@ func (q *cQuestion) getIsbusinessData(ctx context.Context, code string) (bRes *B
 }
 
 // DetailQuestion 问题处理
-func (q *cQuestion) DetailQuestion(ctx context.Context, qRes *QuestionReq) (reply string, res *bufio.Reader, from int, err error) {
+func (q *cQuestion) DetailQuestion(ctx context.Context, qRes *QuestionReq) (reply string, res io.ReadCloser, from int, err error) {
 	qRes.ParseHistoryFsw()
 	// 语义服务
 	sRes, err := ChatGpt.SimpleDo(ctx, qRes)

+ 25 - 19
internal/model/ws.go

@@ -51,10 +51,10 @@ func (m *WsChat) Handle(ws *ghttp.WebSocket, msg []byte) {
 		PersonId:   jSession.PositionId,
 		CreateTime: time.Now().Format(date.Date_Full_Layout),
 	})
-	content, buf, replyId, errMsg := func() (string, *bufio.Reader, int64, error) {
+	content, res, replyId, errMsg := func() (string, io.ReadCloser, int64, error) {
 		var (
 			err   error
-			buf   *bufio.Reader
+			res   io.ReadCloser
 			reply string
 		)
 		errReply := func() string {
@@ -75,7 +75,7 @@ func (m *WsChat) Handle(ws *ghttp.WebSocket, msg []byte) {
 		if errReply != "" {
 			reply, from = errReply, -1
 		} else {
-			reply, buf, from, err = Question.DetailQuestion(m.Ctx, req)
+			reply, res, from, err = Question.DetailQuestion(m.Ctx, req)
 			if err != nil {
 				g.Log().Error(m.Ctx, "问答异常", err)
 				reply, from = g.Cfg().MustGet(m.Ctx, "limit.errMsg").String(), -1
@@ -83,7 +83,7 @@ func (m *WsChat) Handle(ws *ghttp.WebSocket, msg []byte) {
 		}
 
 		if from == Answer_ChatGPT {
-			return reply, buf, 0, nil
+			return reply, res, 0, nil
 		}
 		if reply == "" {
 			reply, from = g.Cfg().MustGet(m.Ctx, "limit.emptyMsg").String(), -1
@@ -105,39 +105,45 @@ func (m *WsChat) Handle(ws *ghttp.WebSocket, msg []byte) {
 		}
 		return reply, nil, replyId, nil
 	}()
+	if res != nil {
+		defer res.Close()
+	}
 	if from != Answer_ChatGPT {
 		if errMsg != nil {
 			_ = ws.WriteJSON(g.Map{"error_code": -1, "error_msg": errMsg.Error(), "data": nil})
 		} else {
 			_ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"id": encrypt.SE.Encode2Hex(fmt.Sprintf("%d", replyId)), "reply": content, "isEnd": true}})
 		}
-	} else if buf != nil {
+	} else if res != nil {
+		buf, lastData := bufio.NewReader(res), &BufRes{}
 		for {
 			line, _, err := buf.ReadLine()
 			if err == io.EOF {
+				replyId := ChatHistory.Save(m.Ctx, &ChatRecord{
+					Content:    lastData.Response,
+					Type:       2,
+					Actions:    1,
+					QuestionId: questionId,
+					PersonId:   jSession.PositionId,
+					Item:       Answer_ChatGPT,
+					CreateTime: time.Now().Format(date.Date_Full_Layout),
+				})
+				_ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"id": encrypt.SE.Encode2Hex(fmt.Sprintf("%d", replyId)), "reply": lastData.Response, "isEnd": lastData.Finished}})
 				break
 			}
 			if _, data := parseEventStream(line); data != nil {
 				data.Response = fsw.Repl(data.Response)
-				if data.Finished {
-					replyId := ChatHistory.Save(m.Ctx, &ChatRecord{
-						Content:    data.Response,
-						Type:       2,
-						Actions:    1,
-						QuestionId: questionId,
-						PersonId:   jSession.PositionId,
-						Item:       Answer_ChatGPT,
-						CreateTime: time.Now().Format(date.Date_Full_Layout),
-					})
-					_ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"id": encrypt.SE.Encode2Hex(fmt.Sprintf("%d", replyId)), "reply": data.Response, "isEnd": data.Finished}})
-				} else {
-					_ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"reply": data.Response, "isEnd": data.Finished}})
-				}
+				lastData = data
+				_ = ws.WriteJSON(g.Map{"error_code": 0, "error_msg": "", "data": g.Map{"reply": lastData.Response, "isEnd": false}})
 			}
 		}
 	}
 }
 
+func saveAndReturnMap() {
+
+}
+
 func parseEventStream(line []byte) (event string, date *BufRes) {
 	// 如果行以 "event:" 开头,表示这是一个事件的标识符
 	if len(line) > 6 && string(line[:6]) == "event:" {