job.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package main
  2. import (
  3. util "app.yhyue.com/moapp/jybase/common"
  4. "fmt"
  5. monitor "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/asr/v20190614"
  6. "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common"
  7. "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/errors"
  8. "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common/profile"
  9. "log"
  10. "time"
  11. )
  12. func job() {
  13. log.Println("语音识别定时任务开始")
  14. credential := common.NewCredential(
  15. "keyid",
  16. "key",
  17. )
  18. cpf := profile.NewClientProfile()
  19. cpf.HttpProfile.Endpoint = "monitor.tencentcloudapi.com"
  20. client, _ = monitor.NewClient(credential, "ap-shanghai", cpf)
  21. sql := `SELECT id,MonitorFilename FROM voice_record WHERE %s MonitorFilename <> '' AND ISNULL(callText) ORDER BY createTime DESC `
  22. addSql := ``
  23. if cfg.VoiceRecordId > 0 {
  24. addSql = fmt.Sprintf("id > %d AND ", cfg.VoiceRecordId)
  25. }
  26. sql = fmt.Sprintf(sql, addSql)
  27. TiDb.SelectByBath(cfg.Bath, func(l *[]map[string]interface{}) bool {
  28. if len(*l) > 0 {
  29. cfg.VoiceRecordId = util.Int64All((*l)[0]["id"])
  30. log.Println(fmt.Sprintf("此次处理数据量:%d,最大id:%d", len(*l), cfg.VoiceRecordId))
  31. for _, v := range *l {
  32. ch <- true
  33. go FormatData(v)
  34. }
  35. }
  36. return true
  37. }, sql) // 需要加where进行增量
  38. log.Println("语音识别定时任务结束,id:", cfg.VoiceRecordId)
  39. }
  40. // FormatData 获取TaskId 并获取录音识别结果
  41. func FormatData(data map[string]interface{}) {
  42. defer util.Catch()
  43. request := monitor.NewCreateRecTaskRequest()
  44. request.EngineModelType = common.StringPtr("8k_zh")
  45. request.ChannelNum = common.Uint64Ptr(1)
  46. request.SourceType = common.Uint64Ptr(0)
  47. request.ResTextFormat = common.Uint64Ptr(0)
  48. request.Url = common.StringPtr(util.ObjToString(data["MonitorFilename"]))
  49. response, err := client.CreateRecTask(request)
  50. if _, ok := err.(*errors.TencentCloudSDKError); ok {
  51. <-ch
  52. fmt.Printf("An API error has returned: %s", err)
  53. return
  54. }
  55. if err != nil {
  56. <-ch
  57. panic(err)
  58. return
  59. }
  60. var (
  61. recordId = util.Int64All(data["id"])
  62. pollTime = map[int64]int{}
  63. //获取 response.Response.Data.TaskId
  64. //查询第二个接口: 查询任务结果 需要轮询
  65. taskId = response.Response.Data.TaskId
  66. requests = monitor.NewDescribeTaskStatusRequest()
  67. )
  68. requests.TaskId = taskId
  69. L:
  70. for {
  71. if pollTime[recordId] > cfg.PollCount {
  72. //轮询大于pollCount,则跳出,防止死循环
  73. break L
  74. }
  75. pollTime[recordId] += 1
  76. r, errs := client.DescribeTaskStatus(requests)
  77. log.Println(r, errs)
  78. if errs != nil {
  79. log.Println("根据taskId 请求语音识别结果异常:", errs.Error())
  80. break L
  81. }
  82. switch *r.Response.Data.StatusStr {
  83. case "success":
  84. if updateVoiceRecord(*r.Response.Data.Result, recordId) {
  85. break L
  86. }
  87. case "waiting":
  88. time.Sleep(time.Duration(cfg.WaitingTime) * time.Second)
  89. continue L
  90. case "doing":
  91. time.Sleep(time.Duration(cfg.DoingTime) * time.Second)
  92. continue L
  93. case "failed":
  94. if updateVoiceRecord(*r.Response.Data.ErrorMsg, recordId) {
  95. break L
  96. }
  97. }
  98. }
  99. <-ch
  100. }
  101. func updateVoiceRecord(content string, id int64) bool {
  102. return TiDb.UpdateOrDeleteBySql(`UPDATE voice_record SET callText = ? WHERE id = ?`, content, id) > 0
  103. }