package main import ( "fmt" "log" "time" util "app.yhyue.com/moapp/jybase/common" asr "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/asr/v20190614" "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common" "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/errors" "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile" ) var client *asr.Client func job() { log.Println("语音识别定时任务开始") credential := common.NewCredential( cfg.KeyId, cfg.Key, ) cpf := profile.NewClientProfile() cpf.HttpProfile.Endpoint = "asr.tencentcloudapi.com" client, _ = asr.NewClient(credential, "ap-guangzhou", cpf) sql := `SELECT id,MonitorFilename FROM voice_record WHERE %s MonitorFilename <> '' AND CallTimeLength > 0 AND ISNULL(callText) ORDER BY createTime DESC ` addSql := `` if cfg.VoiceRecordId > 0 { addSql = fmt.Sprintf("id > %d AND ", cfg.VoiceRecordId) } sql = fmt.Sprintf(sql, addSql) TiDb.SelectByBath(cfg.Bath, func(l *[]map[string]interface{}) bool { if len(*l) > 0 { cfg.VoiceRecordId = util.Int64All((*l)[0]["id"]) log.Println(fmt.Sprintf("此次处理数据量:%d,最大id:%d", len(*l), cfg.VoiceRecordId)) for _, v := range *l { ch <- true go FormatData(v) } } return true }, sql) // 需要加where进行增量 log.Println("语音识别定时任务结束,id:", cfg.VoiceRecordId) util.WriteSysConfig(&cfg) } // FormatData 获取TaskId 并获取录音识别结果 func FormatData(data map[string]interface{}) { log.Println("开始 ", data["id"]) defer util.Catch() request := asr.NewCreateRecTaskRequest() request.EngineModelType = common.StringPtr("8k_zh") request.ChannelNum = common.Uint64Ptr(1) request.SpeakerDiarization = common.Int64Ptr(1) request.SpeakerNumber = common.Int64Ptr(2) request.ResTextFormat = common.Uint64Ptr(0) request.SourceType = common.Uint64Ptr(0) request.Url = common.StringPtr(util.ObjToString(data["MonitorFilename"])) response, err := client.CreateRecTask(request) if _, ok := err.(*errors.TencentCloudSDKError); ok { <-ch fmt.Printf("An API error has returned: %s", err) return } fmt.Printf("结果======%s", response.ToJsonString()) var ( recordId = util.Int64All(data["id"]) pollTime = map[int64]int{} //获取 response.Response.Data.TaskId //查询第二个接口: 查询任务结果 需要轮询 taskId = response.Response.Data.TaskId ) if taskId == nil { return } // requests := asr.NewDescribeTaskStatusRequest() // requests.TaskId = taskId L: for { if pollTime[recordId] > cfg.PollCount { //轮询大于pollCount,则跳出,防止死循环 break L } pollTime[recordId] += 1 requestss := asr.NewDescribeTaskStatusRequest() log.Println("任务id ", *taskId) requestss.TaskId = taskId r, errs := client.DescribeTaskStatus(requestss) log.Println("查询结果======", r.ToJsonString(), errs) if _, oks := errs.(*errors.TencentCloudSDKError); oks { fmt.Printf("An API error has returned: %s", errs) time.Sleep(time.Duration(cfg.WaitingTime) * time.Second) continue L } switch *r.Response.Data.StatusStr { case "success": if updateVoiceRecord(*r.Response.Data.Result, recordId) { break L } case "waiting": time.Sleep(time.Duration(cfg.WaitingTime) * time.Second) continue L case "doing": time.Sleep(time.Duration(cfg.DoingTime) * time.Second) continue L case "failed": log.Println("转换失败,再次转换") requestsss := asr.NewCreateRecTaskRequest() requestsss.EngineModelType = common.StringPtr("8k_zh") requestsss.ChannelNum = common.Uint64Ptr(1) requestsss.SpeakerDiarization = common.Int64Ptr(1) requestsss.SpeakerNumber = common.Int64Ptr(2) requestsss.ResTextFormat = common.Uint64Ptr(0) requestsss.SourceType = common.Uint64Ptr(0) requestsss.Url = common.StringPtr(util.ObjToString(data["MonitorFilename"])) responses, errss := client.CreateRecTask(requestsss) if _, ok := errss.(*errors.TencentCloudSDKError); ok { <-ch fmt.Printf("An API error has returned: %s", errss) return } taskId = responses.Response.Data.TaskId log.Println("任务id ", *taskId) time.Sleep(time.Duration(cfg.WaitingTime) * time.Second) continue L } } <-ch } func updateVoiceRecord(content string, id int64) bool { return TiDb.UpdateOrDeleteBySql(`UPDATE voice_record SET callText = ? WHERE id = ?`, content, id) > 0 }