service.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package service
  2. import (
  3. cron "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/crontab/handle"
  4. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/entity"
  5. . "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/init"
  6. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/internal/config"
  7. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/service"
  8. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/warn"
  9. "app.yhyue.com/moapp/jy_docs/services/partner"
  10. "app.yhyue.com/moapp/jybase/common"
  11. "app.yhyue.com/moapp/jybase/date"
  12. "context"
  13. "fmt"
  14. "github.com/gogf/gf/v2/util/gconv"
  15. "log"
  16. "time"
  17. )
  18. // 获取豆丁docs基本信息
  19. func GetDocinInfoTask(ctx context.Context) {
  20. defer common.Catch()
  21. cron.CrontabByTicker(ctx, Cron.NewDocsList, SyncDocinInfo)
  22. }
  23. func SyncDocinInfo(cron config.Cron) {
  24. log.Println("sync Docin info task start :", date.NowFormat(date.Date_Full_Layout))
  25. var (
  26. startId = cron.StartId
  27. h = service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.DocList.Name, I.Docin.DocList.Pathname, I.Docin.DocList.Method, map[string]interface{}{
  28. "startId": startId,
  29. "count": Cron.NewDocsList.Count,
  30. })
  31. b, err, _ = h.HttpFunc()
  32. lastId = startId
  33. )
  34. go h.SaveDocinLogger(b, err, "req")
  35. if err == nil {
  36. if len(b) > 0 {
  37. var et, at int
  38. //豆丁同步数据到mongo
  39. go h.SaveDocinLogger(b, err, "res")
  40. err, lastId, et, at = InsertDocinInfos(b)
  41. entity.SyncExpectTotal = entity.SyncExpectTotal + et
  42. entity.SyncActualTotal = entity.SyncActualTotal + at
  43. if err != nil {
  44. go warn.SendMsgByWXURL(fmt.Sprintf("同步豆丁数据到tidb失败:%s,当前任务执行参数:%v", err.Error(), cron))
  45. } else if lastId > 0 {
  46. //周期内数据未同步完成
  47. cron.StartId = lastId
  48. if cron.Count == et {
  49. if st := Cron.NewDocsList.SleepTime; st > 0 {
  50. time.Sleep(time.Duration(st) * time.Millisecond)
  51. }
  52. SyncDocinInfo(cron)
  53. }
  54. }
  55. }
  56. } else {
  57. go warn.SendMsgByWXURL(fmt.Sprintf("获取豆丁数据列表失败:%s,当前任务执行参数:%v", err.Error(), cron))
  58. }
  59. //缓存 保存最后一次更新id
  60. if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b {
  61. log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId)
  62. } else {
  63. log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId)
  64. }
  65. log.Println(fmt.Sprintf("sync Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.SyncExpectTotal, entity.SyncActualTotal))
  66. }
  67. // 更新豆丁docs基本信息
  68. func UpdateDocinInfoTask(ctx context.Context) {
  69. defer common.Catch()
  70. cron.CrontabByTicker(ctx, Cron.UpdateDocsList, UpdateDocinInfo)
  71. }
  72. func UpdateDocinInfo(cron config.Cron) {
  73. var (
  74. startId = cron.StartId
  75. count = cron.Count
  76. startDate = cron.StartDate
  77. endDate = cron.EndDate
  78. )
  79. log.Println("update Docin info task start :", date.NowFormat(date.Date_Full_Layout))
  80. if endDate == "" {
  81. endDate = date.NowFormat(date.Date_yyyyMMdd)
  82. }
  83. if startDate == "" || endDate == "" || count == 0 || gconv.Int64(startDate) >= gconv.Int64(endDate) {
  84. go warn.SendMsgByWXURL(fmt.Sprintf("timetask 保存 docin 文档失败--参数异常:startId:%d --startDate:%s --endDate:%s--count:%d", startId, startDate, endDate, count))
  85. return
  86. }
  87. h := service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.UpdateList.Name, I.Docin.UpdateList.Pathname, I.Docin.UpdateList.Method, map[string]interface{}{
  88. "startId": startId,
  89. "count": count,
  90. "startDate": startDate,
  91. "endDate": endDate,
  92. })
  93. b, err, _ := h.HttpFunc()
  94. lastId := startId
  95. go h.SaveDocinLogger(b, err, "req")
  96. if err == nil {
  97. if len(b) > 0 {
  98. var et, at int
  99. //豆丁更新数据到mongo
  100. go h.SaveDocinLogger(b, err, "res")
  101. err, lastId, et, at = UpdateDocinInfos(b)
  102. entity.UpdateExpectTotal = entity.UpdateExpectTotal + et
  103. entity.UpdateActualTotal = entity.UpdateActualTotal + at
  104. if err != nil {
  105. go warn.SendMsgByWXURL(fmt.Sprintf("更新豆丁数据到tidb失败:%s,当前任务执行参数:%v", err.Error(), cron))
  106. } else if lastId > 0 {
  107. cron.StartId = lastId
  108. //周期内数据未同步完成
  109. if count == et {
  110. if st := Cron.UpdateDocsList.SleepTime; st > 0 {
  111. time.Sleep(time.Duration(st) * time.Millisecond)
  112. }
  113. UpdateDocinInfo(cron)
  114. }
  115. }
  116. }
  117. } else {
  118. go warn.SendMsgByWXURL(fmt.Sprintf("获取豆丁更新数据失败:%s,当前任务执行参数:%v", err.Error(), cron))
  119. }
  120. //缓存数据处理
  121. partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1)
  122. partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey)
  123. //休眠
  124. if st := Cron.UpdateDocsList.SleepTime; st > 0 {
  125. time.Sleep(time.Duration(st) * time.Millisecond)
  126. }
  127. log.Println(fmt.Sprintf("update Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.UpdateExpectTotal, entity.UpdateActualTotal))
  128. }