service.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. package service
  2. import (
  3. cron "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/crontab/handle"
  4. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/entity"
  5. . "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/init"
  6. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/internal/config"
  7. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/service"
  8. "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/warn"
  9. "app.yhyue.com/moapp/jy_docs/services/partner"
  10. "app.yhyue.com/moapp/jybase/common"
  11. "app.yhyue.com/moapp/jybase/date"
  12. "context"
  13. "fmt"
  14. "github.com/gogf/gf/v2/util/gconv"
  15. "log"
  16. "net/url"
  17. "sync"
  18. "time"
  19. )
  20. // 获取豆丁docs基本信息
  21. func GetDocinInfoTask(ctx context.Context) {
  22. defer common.Catch()
  23. cron.CrontabByTicker(ctx, Cron.NewDocsList, SyncDocinInfo)
  24. }
  25. func SyncDocinInfo(cron config.Cron) {
  26. log.Println("sync Docin info task start :", date.NowFormat(date.Date_Full_Layout))
  27. var (
  28. startId = cron.StartId
  29. h = service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.DocList.Name, I.Docin.DocList.Pathname, I.Docin.DocList.Method, url.Values{
  30. "startId": []string{common.InterfaceToStr(startId)},
  31. "count": []string{common.InterfaceToStr(Cron.NewDocsList.Count)},
  32. })
  33. b, err, _ = h.HttpFunc()
  34. lastId = startId
  35. setCache = func(cron config.Cron) {
  36. //缓存 保存最后一次更新id
  37. if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b {
  38. log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId)
  39. } else {
  40. log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId)
  41. }
  42. }
  43. )
  44. go h.SaveDocinLogger(b, err, "req")
  45. if err == nil {
  46. if len(b) > 0 {
  47. var et, at int
  48. //豆丁同步数据到mongo
  49. go h.SaveDocinLogger(b, err, "res")
  50. err, lastId, et, at = InsertDocinInfos(b)
  51. entity.SyncExpectTotal = entity.SyncExpectTotal + et
  52. entity.SyncActualTotal = entity.SyncActualTotal + at
  53. if err != nil {
  54. go warn.SendMsgByWXURL(fmt.Sprintf("同步豆丁数据到tidb失败:%s,当前任务执行参数:%v", err.Error(), cron))
  55. } else if lastId > 0 {
  56. //周期内数据未同步完成
  57. cron.StartId = lastId
  58. setCache(cron)
  59. if cron.Count == et {
  60. if st := Cron.NewDocsList.SleepTime; st > 0 {
  61. time.Sleep(time.Duration(st) * time.Millisecond)
  62. }
  63. SyncDocinInfo(cron)
  64. }
  65. }
  66. }
  67. } else {
  68. go warn.SendMsgByWXURL(fmt.Sprintf("获取豆丁数据列表失败:%s,当前任务执行参数:%v", err.Error(), cron))
  69. }
  70. //缓存 保存最后一次更新id
  71. if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b {
  72. log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId)
  73. } else {
  74. log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId)
  75. }
  76. log.Println(fmt.Sprintf("sync Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.SyncExpectTotal, entity.SyncActualTotal))
  77. }
  78. // 更新豆丁docs基本信息
  79. func UpdateDocinInfoTask(ctx context.Context) {
  80. defer common.Catch()
  81. cron.CrontabByTicker(ctx, Cron.UpdateDocsList, UpdateDocinInfo)
  82. }
  83. func UpdateDocinInfo(cron config.Cron) {
  84. var (
  85. startId = cron.StartId
  86. count = cron.Count
  87. startDate = cron.StartDate
  88. endDate = cron.EndDate
  89. setCache = func(cron config.Cron) {
  90. //缓存数据处理
  91. partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1)
  92. partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey)
  93. }
  94. )
  95. log.Println("update Docin info task start :", date.NowFormat(date.Date_Full_Layout))
  96. if endDate == "" {
  97. endDate = time.Now().AddDate(0, 0, 1).Format(date.Date_yyyyMMdd)
  98. }
  99. if startDate == "" || endDate == "" || count == 0 || gconv.Int64(startDate) >= gconv.Int64(endDate) {
  100. go warn.SendMsgByWXURL(fmt.Sprintf("timetask 保存 docin 文档失败--参数异常:startId:%d --startDate:%s --endDate:%s--count:%d", startId, startDate, endDate, count))
  101. return
  102. }
  103. h := service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.UpdateList.Name, I.Docin.UpdateList.Pathname, I.Docin.UpdateList.Method, url.Values{
  104. "startId": []string{common.InterfaceToStr(startId)},
  105. "count": []string{common.InterfaceToStr(count)},
  106. "startDate": []string{startDate},
  107. "endDate": []string{endDate},
  108. })
  109. b, err, _ := h.HttpFunc()
  110. lastId := startId
  111. go h.SaveDocinLogger(b, err, "req")
  112. if err == nil {
  113. if len(b) > 0 {
  114. var et, at int
  115. //豆丁更新数据到mongo
  116. go h.SaveDocinLogger(b, err, "res")
  117. err, lastId, et, at = UpdateDocinInfos(b)
  118. entity.UpdateExpectTotal = entity.UpdateExpectTotal + et
  119. entity.UpdateActualTotal = entity.UpdateActualTotal + at
  120. if err != nil {
  121. go warn.SendMsgByWXURL(fmt.Sprintf("更新豆丁数据到tidb失败:%s,当前任务执行参数:%v", err.Error(), cron))
  122. } else if lastId > 0 {
  123. cron.StartId = lastId
  124. setCache(cron)
  125. //周期内数据未同步完成
  126. if count == et {
  127. if st := Cron.UpdateDocsList.SleepTime; st > 0 {
  128. time.Sleep(time.Duration(st) * time.Millisecond)
  129. }
  130. UpdateDocinInfo(cron)
  131. }
  132. }
  133. }
  134. } else {
  135. go warn.SendMsgByWXURL(fmt.Sprintf("获取豆丁更新数据失败:%s,当前任务执行参数:%v", err.Error(), cron))
  136. }
  137. //缓存数据处理
  138. partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1)
  139. partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey)
  140. log.Println(fmt.Sprintf("update Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.UpdateExpectTotal, entity.UpdateActualTotal))
  141. }
  142. // 文档分类文档数据量更新
  143. func UpdateDocsClassTask(ctx context.Context) {
  144. defer common.Catch()
  145. cron.CrontabByTicker(ctx, Cron.UpdateDocsClass, DocsClassUpdate)
  146. }
  147. func DocsClassUpdate(cron config.Cron) {
  148. log.Println("update Docs class count task start :", date.NowFormat(date.Date_Full_Layout), cron)
  149. //获取分类详情
  150. if res := GetClassInfo(); len(res) > 0 {
  151. var (
  152. pool = make(chan bool, 6)
  153. wg = &sync.WaitGroup{}
  154. )
  155. for _, cv := range res {
  156. pool <- true
  157. wg.Add(1)
  158. go func(cv *Result) {
  159. defer func() {
  160. wg.Done()
  161. <-pool
  162. }()
  163. UpdateDocsSize(cv)
  164. }(cv)
  165. }
  166. wg.Wait()
  167. }
  168. log.Println(fmt.Sprintf("update Docs class count task end :%s", date.NowFormat(date.Date_Full_Layout)))
  169. }
  170. // 合作商会员信息更新,每天一点开始
  171. func UpdateDocsUserMemberTask(ctx context.Context) {
  172. defer common.Catch()
  173. cron.CrontabByTicker(ctx, Cron.UpdateDocsMember, DocsUserMemberUpdate)
  174. }
  175. func DocsUserMemberUpdate(cron config.Cron) {
  176. log.Println("update docs user member state task start :", date.NowFormat(date.Date_Full_Layout), cron)
  177. //直接更新合作商会员状态
  178. if err := DocsUserMember(); err != nil {
  179. log.Println("合作商会员信息更新 定时任务异常: ", err.Error())
  180. }
  181. log.Println(fmt.Sprintf("update docs user member state task end :%s", date.NowFormat(date.Date_Full_Layout)))
  182. }