service.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. package service
  2. import (
  3. cron "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/crontab/handle"
  4. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/entity"
  5. . "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/init"
  6. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/internal/config"
  7. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/service"
  8. "app.yhyue.com/moapp/jy_docs/services/partner"
  9. "app.yhyue.com/moapp/jybase/common"
  10. "app.yhyue.com/moapp/jybase/date"
  11. "context"
  12. "fmt"
  13. "github.com/gogf/gf/v2/util/gconv"
  14. "log"
  15. "time"
  16. )
  17. // 获取豆丁docs基本信息
  18. func GetDocinInfoTask(ctx context.Context) {
  19. defer common.Catch()
  20. cron.CrontabByTicker(ctx, Cron.NewDocsList, SyncDocinInfo)
  21. }
  22. func SyncDocinInfo(cron config.Cron) {
  23. log.Println("sync Docin info task start :", date.NowFormat(date.Date_Full_Layout))
  24. var (
  25. startId = cron.StartId
  26. h = service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.DocList.Name, I.Docin.DocList.Pathname, I.Docin.DocList.Method, map[string]interface{}{
  27. "startId": startId,
  28. "count": Cron.NewDocsList.Count,
  29. })
  30. b, err = h.HttpFunc()
  31. lastId = startId
  32. )
  33. go h.SaveRequestLogger(b, err)
  34. if err == nil {
  35. if len(b) > 0 {
  36. var et, at int
  37. //豆丁同步数据到mongo
  38. go h.SaveResponseData(b, err)
  39. err, lastId, et, at = InsertDocinInfos(b)
  40. entity.SyncExpectTotal = entity.SyncExpectTotal + et
  41. entity.SyncActualTotal = entity.SyncActualTotal + at
  42. if err != nil {
  43. log.Println("保存docin 文档失败:", err.Error()) //todo 发送异常信号
  44. } else if lastId > 0 {
  45. //周期内数据未同步完成
  46. cron.StartId = lastId
  47. if cron.Count == et {
  48. if st := Cron.NewDocsList.SleepTime; st > 0 {
  49. time.Sleep(time.Duration(st) * time.Millisecond)
  50. }
  51. SyncDocinInfo(cron)
  52. }
  53. }
  54. }
  55. } else {
  56. log.Println("获取doc文库列表失败:", err.Error()) //todo 发送异常信号
  57. }
  58. //缓存 保存最后一次更新id
  59. if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b {
  60. log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId)
  61. } else {
  62. log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId)
  63. }
  64. log.Println(fmt.Sprintf("sync Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.SyncExpectTotal, entity.SyncActualTotal))
  65. }
  66. // 更新豆丁docs基本信息
  67. func UpdateDocinInfoTask(ctx context.Context) {
  68. defer common.Catch()
  69. cron.CrontabByTicker(ctx, Cron.UpdateDocsList, UpdateDocinInfo)
  70. }
  71. func UpdateDocinInfo(cron config.Cron) {
  72. var (
  73. startId = cron.StartId
  74. count = cron.Count
  75. startDate = cron.StartDate
  76. endDate = cron.EndDate
  77. )
  78. log.Println("update Docin info task start :", date.NowFormat(date.Date_Full_Layout))
  79. if endDate == "" {
  80. endDate = date.NowFormat(date.Date_yyyyMMdd)
  81. }
  82. if startDate == "" || endDate == "" || count == 0 || gconv.Int64(startDate) >= gconv.Int64(endDate) {
  83. log.Println("timetask 保存 docin 文档失败--参数异常:startId:", startId, "--startDate:", startDate, "--endDate:", endDate, "--count:", count) //todo 发送异常信号
  84. return
  85. }
  86. h := service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.UpdateList.Name, I.Docin.UpdateList.Pathname, I.Docin.UpdateList.Method, map[string]interface{}{
  87. "startId": startId,
  88. "count": count,
  89. "startDate": startDate,
  90. "endDate": endDate,
  91. })
  92. b, err := h.HttpFunc()
  93. lastId := startId
  94. go h.SaveRequestLogger(b, err)
  95. if err == nil {
  96. if len(b) > 0 {
  97. var et, at int
  98. //豆丁更新数据到mongo
  99. go h.SaveResponseData(b, err)
  100. err, lastId, et, at = UpdateDocinInfos(b)
  101. entity.UpdateExpectTotal = entity.UpdateExpectTotal + et
  102. entity.UpdateActualTotal = entity.UpdateActualTotal + at
  103. if err != nil {
  104. log.Println("更新 docin 文档失败:", err.Error()) //todo 发送异常信号
  105. } else if lastId > 0 {
  106. cron.StartId = lastId
  107. //周期内数据未同步完成
  108. if count == et {
  109. if st := Cron.UpdateDocsList.SleepTime; st > 0 {
  110. time.Sleep(time.Duration(st) * time.Millisecond)
  111. }
  112. UpdateDocinInfo(cron)
  113. }
  114. }
  115. }
  116. } else {
  117. log.Println("更新 doc文库列表失败:", err.Error()) //todo 发送异常信号
  118. }
  119. //缓存数据处理
  120. partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1)
  121. partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey)
  122. //休眠
  123. if st := Cron.UpdateDocsList.SleepTime; st > 0 {
  124. time.Sleep(time.Duration(st) * time.Millisecond)
  125. }
  126. log.Println(fmt.Sprintf("update Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.UpdateExpectTotal, entity.UpdateActualTotal))
  127. }