job.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package main
  2. import (
  3. "fmt"
  4. "log"
  5. "time"
  6. util "app.yhyue.com/moapp/jybase/common"
  7. asr "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/asr/v20190614"
  8. "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
  9. "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/errors"
  10. "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile"
  11. )
  12. var client *asr.Client
  13. func job() {
  14. log.Println("语音识别定时任务开始")
  15. credential := common.NewCredential(
  16. cfg.KeyId,
  17. cfg.Key,
  18. )
  19. cpf := profile.NewClientProfile()
  20. cpf.HttpProfile.Endpoint = "asr.tencentcloudapi.com"
  21. client, _ = asr.NewClient(credential, "ap-guangzhou", cpf)
  22. sql := `SELECT id,MonitorFilename FROM voice_record WHERE %s MonitorFilename <> '' AND CallTimeLength > 0 AND ISNULL(callText) ORDER BY createTime DESC `
  23. addSql := ``
  24. if cfg.VoiceRecordId > 0 {
  25. addSql = fmt.Sprintf("id > %d AND ", cfg.VoiceRecordId)
  26. }
  27. sql = fmt.Sprintf(sql, addSql)
  28. TiDb.SelectByBath(cfg.Bath, func(l *[]map[string]interface{}) bool {
  29. if len(*l) > 0 {
  30. cfg.VoiceRecordId = util.Int64All((*l)[0]["id"])
  31. log.Println(fmt.Sprintf("此次处理数据量:%d,最大id:%d", len(*l), cfg.VoiceRecordId))
  32. for _, v := range *l {
  33. ch <- true
  34. go FormatData(v)
  35. }
  36. }
  37. return true
  38. }, sql) // 需要加where进行增量
  39. log.Println("语音识别定时任务结束,id:", cfg.VoiceRecordId)
  40. util.WriteSysConfig(&cfg)
  41. }
  42. // FormatData 获取TaskId 并获取录音识别结果
  43. func FormatData(data map[string]interface{}) {
  44. log.Println("开始 ", data["id"])
  45. defer util.Catch()
  46. request := asr.NewCreateRecTaskRequest()
  47. request.EngineModelType = common.StringPtr("8k_zh")
  48. request.ChannelNum = common.Uint64Ptr(1)
  49. request.SpeakerDiarization = common.Int64Ptr(1)
  50. request.SpeakerNumber = common.Int64Ptr(2)
  51. request.ResTextFormat = common.Uint64Ptr(0)
  52. request.SourceType = common.Uint64Ptr(0)
  53. request.Url = common.StringPtr(util.ObjToString(data["MonitorFilename"]))
  54. response, err := client.CreateRecTask(request)
  55. if _, ok := err.(*errors.TencentCloudSDKError); ok {
  56. <-ch
  57. fmt.Printf("An API error has returned: %s", err)
  58. return
  59. }
  60. fmt.Printf("结果======%s", response.ToJsonString())
  61. var (
  62. recordId = util.Int64All(data["id"])
  63. pollTime = map[int64]int{}
  64. //获取 response.Response.Data.TaskId
  65. //查询第二个接口: 查询任务结果 需要轮询
  66. taskId = response.Response.Data.TaskId
  67. )
  68. if taskId == nil {
  69. return
  70. }
  71. // requests := asr.NewDescribeTaskStatusRequest()
  72. // requests.TaskId = taskId
  73. L:
  74. for {
  75. if pollTime[recordId] > cfg.PollCount {
  76. //轮询大于pollCount,则跳出,防止死循环
  77. break L
  78. }
  79. pollTime[recordId] += 1
  80. requestss := asr.NewDescribeTaskStatusRequest()
  81. log.Println("任务id ", *taskId)
  82. requestss.TaskId = taskId
  83. r, errs := client.DescribeTaskStatus(requestss)
  84. log.Println("查询结果======", r.ToJsonString(), errs)
  85. if _, oks := errs.(*errors.TencentCloudSDKError); oks {
  86. fmt.Printf("An API error has returned: %s", errs)
  87. time.Sleep(time.Duration(cfg.WaitingTime) * time.Second)
  88. continue L
  89. }
  90. switch *r.Response.Data.StatusStr {
  91. case "success":
  92. if updateVoiceRecord(*r.Response.Data.Result, recordId) {
  93. break L
  94. }
  95. case "waiting":
  96. time.Sleep(time.Duration(cfg.WaitingTime) * time.Second)
  97. continue L
  98. case "doing":
  99. time.Sleep(time.Duration(cfg.DoingTime) * time.Second)
  100. continue L
  101. case "failed":
  102. log.Println("转换失败,再次转换")
  103. requestsss := asr.NewCreateRecTaskRequest()
  104. requestsss.EngineModelType = common.StringPtr("8k_zh")
  105. requestsss.ChannelNum = common.Uint64Ptr(1)
  106. requestsss.SpeakerDiarization = common.Int64Ptr(1)
  107. requestsss.SpeakerNumber = common.Int64Ptr(2)
  108. requestsss.ResTextFormat = common.Uint64Ptr(0)
  109. requestsss.SourceType = common.Uint64Ptr(0)
  110. requestsss.Url = common.StringPtr(util.ObjToString(data["MonitorFilename"]))
  111. responses, errss := client.CreateRecTask(requestsss)
  112. if _, ok := errss.(*errors.TencentCloudSDKError); ok {
  113. <-ch
  114. fmt.Printf("An API error has returned: %s", errss)
  115. return
  116. }
  117. taskId = responses.Response.Data.TaskId
  118. log.Println("任务id ", *taskId)
  119. time.Sleep(time.Duration(cfg.WaitingTime) * time.Second)
  120. continue L
  121. }
  122. }
  123. <-ch
  124. }
  125. func updateVoiceRecord(content string, id int64) bool {
  126. return TiDb.UpdateOrDeleteBySql(`UPDATE voice_record SET callText = ? WHERE id = ?`, content, id) > 0
  127. }