service.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. package service
  2. import (
  3. cron "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/crontab/handle"
  4. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/entity"
  5. . "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/init"
  6. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/internal/config"
  7. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/service"
  8. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/warn"
  9. "app.yhyue.com/moapp/jy_docs/services/partner"
  10. "app.yhyue.com/moapp/jybase/common"
  11. "app.yhyue.com/moapp/jybase/date"
  12. "context"
  13. "fmt"
  14. "github.com/gogf/gf/v2/util/gconv"
  15. "log"
  16. "sync"
  17. "time"
  18. )
  19. // 获取豆丁docs基本信息
  20. func GetDocinInfoTask(ctx context.Context) {
  21. defer common.Catch()
  22. cron.CrontabByTicker(ctx, Cron.NewDocsList, SyncDocinInfo)
  23. }
  24. func SyncDocinInfo(cron config.Cron) {
  25. log.Println("sync Docin info task start :", date.NowFormat(date.Date_Full_Layout))
  26. var (
  27. startId = cron.StartId
  28. h = service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.DocList.Name, I.Docin.DocList.Pathname, I.Docin.DocList.Method, map[string]interface{}{
  29. "startId": startId,
  30. "count": Cron.NewDocsList.Count,
  31. })
  32. b, err, _ = h.HttpFunc()
  33. lastId = startId
  34. setCache = func(cron config.Cron) {
  35. //缓存 保存最后一次更新id
  36. if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b {
  37. log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId)
  38. } else {
  39. log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId)
  40. }
  41. }
  42. )
  43. go h.SaveDocinLogger(b, err, "req")
  44. if err == nil {
  45. if len(b) > 0 {
  46. var et, at int
  47. //豆丁同步数据到mongo
  48. go h.SaveDocinLogger(b, err, "res")
  49. err, lastId, et, at = InsertDocinInfos(b)
  50. entity.SyncExpectTotal = entity.SyncExpectTotal + et
  51. entity.SyncActualTotal = entity.SyncActualTotal + at
  52. if err != nil {
  53. go warn.SendMsgByWXURL(fmt.Sprintf("同步豆丁数据到tidb失败:%s,当前任务执行参数:%v", err.Error(), cron))
  54. } else if lastId > 0 {
  55. //周期内数据未同步完成
  56. cron.StartId = lastId
  57. setCache(cron)
  58. if cron.Count == et {
  59. if st := Cron.NewDocsList.SleepTime; st > 0 {
  60. time.Sleep(time.Duration(st) * time.Millisecond)
  61. }
  62. SyncDocinInfo(cron)
  63. }
  64. }
  65. }
  66. } else {
  67. go warn.SendMsgByWXURL(fmt.Sprintf("获取豆丁数据列表失败:%s,当前任务执行参数:%v", err.Error(), cron))
  68. }
  69. //缓存 保存最后一次更新id
  70. if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b {
  71. log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId)
  72. } else {
  73. log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId)
  74. }
  75. log.Println(fmt.Sprintf("sync Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.SyncExpectTotal, entity.SyncActualTotal))
  76. }
  77. // 更新豆丁docs基本信息
  78. func UpdateDocinInfoTask(ctx context.Context) {
  79. defer common.Catch()
  80. cron.CrontabByTicker(ctx, Cron.UpdateDocsList, UpdateDocinInfo)
  81. }
  82. func UpdateDocinInfo(cron config.Cron) {
  83. var (
  84. startId = cron.StartId
  85. count = cron.Count
  86. startDate = cron.StartDate
  87. endDate = cron.EndDate
  88. setCache = func(cron config.Cron) {
  89. //缓存数据处理
  90. partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1)
  91. partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey)
  92. }
  93. )
  94. log.Println("update Docin info task start :", date.NowFormat(date.Date_Full_Layout))
  95. if endDate == "" {
  96. endDate = time.Now().AddDate(0, 0, 1).Format(date.Date_yyyyMMdd)
  97. }
  98. if startDate == "" || endDate == "" || count == 0 || gconv.Int64(startDate) >= gconv.Int64(endDate) {
  99. go warn.SendMsgByWXURL(fmt.Sprintf("timetask 保存 docin 文档失败--参数异常:startId:%d --startDate:%s --endDate:%s--count:%d", startId, startDate, endDate, count))
  100. return
  101. }
  102. h := service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.UpdateList.Name, I.Docin.UpdateList.Pathname, I.Docin.UpdateList.Method, map[string]interface{}{
  103. "startId": startId,
  104. "count": count,
  105. "startDate": startDate,
  106. "endDate": endDate,
  107. })
  108. b, err, _ := h.HttpFunc()
  109. lastId := startId
  110. go h.SaveDocinLogger(b, err, "req")
  111. if err == nil {
  112. if len(b) > 0 {
  113. var et, at int
  114. //豆丁更新数据到mongo
  115. go h.SaveDocinLogger(b, err, "res")
  116. err, lastId, et, at = UpdateDocinInfos(b)
  117. entity.UpdateExpectTotal = entity.UpdateExpectTotal + et
  118. entity.UpdateActualTotal = entity.UpdateActualTotal + at
  119. if err != nil {
  120. go warn.SendMsgByWXURL(fmt.Sprintf("更新豆丁数据到tidb失败:%s,当前任务执行参数:%v", err.Error(), cron))
  121. } else if lastId > 0 {
  122. cron.StartId = lastId
  123. setCache(cron)
  124. //周期内数据未同步完成
  125. if count == et {
  126. if st := Cron.UpdateDocsList.SleepTime; st > 0 {
  127. time.Sleep(time.Duration(st) * time.Millisecond)
  128. }
  129. UpdateDocinInfo(cron)
  130. }
  131. }
  132. }
  133. } else {
  134. go warn.SendMsgByWXURL(fmt.Sprintf("获取豆丁更新数据失败:%s,当前任务执行参数:%v", err.Error(), cron))
  135. }
  136. //缓存数据处理
  137. partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1)
  138. partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey)
  139. log.Println(fmt.Sprintf("update Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.UpdateExpectTotal, entity.UpdateActualTotal))
  140. }
  141. // 文档分类文档数据量更新
  142. func UpdateDocsClassTask(ctx context.Context) {
  143. defer common.Catch()
  144. cron.CrontabByTicker(ctx, Cron.UpdateDocsClass, DocsClassUpdate)
  145. }
  146. func DocsClassUpdate(cron config.Cron) {
  147. log.Println("update Docs class count task start :", date.NowFormat(date.Date_Full_Layout), cron)
  148. //获取分类详情
  149. if res := GetClassInfo(); len(res) > 0 {
  150. var (
  151. pool = make(chan bool, 6)
  152. wg = &sync.WaitGroup{}
  153. )
  154. for _, cv := range res {
  155. pool <- true
  156. wg.Add(1)
  157. go func(cv *Result) {
  158. defer func() {
  159. wg.Done()
  160. <-pool
  161. }()
  162. UpdateDocsSize(cv)
  163. }(cv)
  164. }
  165. wg.Wait()
  166. }
  167. log.Println(fmt.Sprintf("update Docs class count task end :%s", date.NowFormat(date.Date_Full_Layout)))
  168. }
  169. // 合作商会员信息更新,每天一点开始
  170. func UpdateDocsUserMemberTask(ctx context.Context) {
  171. defer common.Catch()
  172. cron.CrontabByTicker(ctx, Cron.UpdateDocsMember, DocsUserMemberUpdate)
  173. }
  174. func DocsUserMemberUpdate(cron config.Cron) {
  175. log.Println("update docs user member state task start :", date.NowFormat(date.Date_Full_Layout), cron)
  176. //直接更新合作商会员状态
  177. if err := DocsUserMember(); err != nil {
  178. log.Println("合作商会员信息更新 定时任务异常: ", err.Error())
  179. }
  180. log.Println(fmt.Sprintf("update docs user member state task end :%s", date.NowFormat(date.Date_Full_Layout)))
  181. }