浏览代码

feat:数据库数据处理

wangshan 2 年之前
父节点
当前提交
168426ad15
共有 3 个文件被更改,包括 65 次插入13 次删除
  1. 1 0
      csrSync/config.go
  2. 2 1
      csrSync/config.json
  3. 62 12
      csrSync/job.go

+ 1 - 0
csrSync/config.go

@@ -13,5 +13,6 @@ type (
 			MaxIdle     int    `json:"maxidle"`
 			MaxLeftTime int    `json:"maxleft"`
 		} `json:"tiDb"`
+		Bath int
 	}
 )

+ 2 - 1
csrSync/config.json

@@ -9,5 +9,6 @@
 		"poolsize": 20,
 		"maxidle": 40,
 		"maxleft": 40
-	}
+	},
+	"bath": 100
 }

+ 62 - 12
csrSync/job.go

@@ -8,9 +8,14 @@ import (
 	"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/errors"
 	"github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile"
 	"log"
+	"time"
 )
 
-var client *monitor.Client
+var (
+	client    *monitor.Client
+	ch        = make(chan bool, 4)
+	pollCount = 5
+)
 
 func job() {
 	log.Println("语音识别定时任务开始")
@@ -21,16 +26,22 @@ func job() {
 	cpf := profile.NewClientProfile()
 	cpf.HttpProfile.Endpoint = "monitor.tencentcloudapi.com"
 	client, _ = monitor.NewClient(credential, "ap-shanghai", cpf)
-	TiDb.SelectByBath(100, func(l *[]map[string]interface{}) bool {
-		for _, v := range *l {
-			FormatData(v)
+	TiDb.SelectByBath(cfg.Bath, func(l *[]map[string]interface{}) bool {
+		if len(*l) > 0 {
+			log.Println(fmt.Sprintf("此次处理数据量:%d", len(*l)))
+			for _, v := range *l {
+				ch <- true
+				go FormatData(v)
+			}
 		}
 		return true
-	}, `select MonitorFilename from voice_record`) // 需要加where进行增量
+	}, `SELECT id,MonitorFilename FROM voice_record WHERE MonitorFilename <> '' AND  ISNULL(callText) ORDER  BY createTime DESC `) // 需要加where进行增量
 	log.Println("语音识别定时任务结束")
 }
 
+// FormatData 获取TaskId 并获取录音识别结果
 func FormatData(data map[string]interface{}) {
+	defer util.Catch()
 	request := monitor.NewCreateRecTaskRequest()
 	request.EngineModelType = common.StringPtr("8k_zh")
 	request.ChannelNum = common.Uint64Ptr(1)
@@ -39,18 +50,57 @@ func FormatData(data map[string]interface{}) {
 	request.Url = common.StringPtr(util.ObjToString(data["MonitorFilename"]))
 	response, err := client.CreateRecTask(request)
 	if _, ok := err.(*errors.TencentCloudSDKError); ok {
+		<-ch
 		fmt.Printf("An API error has returned: %s", err)
 		return
 	}
 	if err != nil {
+		<-ch
 		panic(err)
+		return
 	}
-
-	//获取 response.Response.Data.TaskId
-	//查询第二个接口: 查询任务结果  需要轮询
-	taskId := response.Response.Data.TaskId
-	requests := monitor.NewDescribeTaskStatusRequest()
+	var (
+		recordId = util.Int64All(data["id"])
+		pollTime = map[int64]int{}
+		//获取 response.Response.Data.TaskId
+		//查询第二个接口: 查询任务结果  需要轮询
+		taskId   = response.Response.Data.TaskId
+		requests = monitor.NewDescribeTaskStatusRequest()
+	)
 	requests.TaskId = taskId
-	responses, errs := client.DescribeTaskStatus(requests)
-	log.Println(responses, errs)
+L:
+	for {
+		if pollTime[recordId] > pollCount {
+			//轮询大于pollCount,则跳出,防止死循环
+			break L
+		}
+		pollTime[recordId] += 1
+		r, errs := client.DescribeTaskStatus(requests)
+		log.Println(r, errs)
+		if errs != nil {
+			log.Println("根据taskId 请求语音识别结果异常:", errs.Error())
+			break L
+		}
+		switch *r.Response.Data.StatusStr {
+		case "success":
+			if updateVoiceRecord(*r.Response.Data.Result, recordId) {
+				break L
+			}
+		case "waiting":
+			time.Sleep(500 * time.Millisecond)
+			continue L
+		case "doing":
+			time.Sleep(200 * time.Millisecond)
+			continue L
+		case "failed":
+			if updateVoiceRecord(*r.Response.Data.ErrorMsg, recordId) {
+				break L
+			}
+		}
+	}
+	<-ch
+}
+
+func updateVoiceRecord(content string, id int64) bool {
+	return TiDb.UpdateOrDeleteBySql(`UPDATE  voice_record  SET callText  = ? WHERE  id  = ?`, content, id) > 0
 }