|
@@ -14,6 +14,7 @@ import (
|
|
|
"fmt"
|
|
|
"github.com/gogf/gf/v2/util/gconv"
|
|
|
"log"
|
|
|
+ "sync"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
@@ -33,6 +34,14 @@ func SyncDocinInfo(cron config.Cron) {
|
|
|
})
|
|
|
b, err, _ = h.HttpFunc()
|
|
|
lastId = startId
|
|
|
+ setCache = func(cron config.Cron) {
|
|
|
+ //缓存 保存最后一次更新id
|
|
|
+ if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b {
|
|
|
+ log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId)
|
|
|
+ } else {
|
|
|
+ log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId)
|
|
|
+ }
|
|
|
+ }
|
|
|
)
|
|
|
go h.SaveDocinLogger(b, err, "req")
|
|
|
if err == nil {
|
|
@@ -48,6 +57,7 @@ func SyncDocinInfo(cron config.Cron) {
|
|
|
} else if lastId > 0 {
|
|
|
//周期内数据未同步完成
|
|
|
cron.StartId = lastId
|
|
|
+ go setCache(cron)
|
|
|
if cron.Count == et {
|
|
|
if st := Cron.NewDocsList.SleepTime; st > 0 {
|
|
|
time.Sleep(time.Duration(st) * time.Millisecond)
|
|
@@ -80,10 +90,15 @@ func UpdateDocinInfo(cron config.Cron) {
|
|
|
count = cron.Count
|
|
|
startDate = cron.StartDate
|
|
|
endDate = cron.EndDate
|
|
|
+ setCache = func(cron config.Cron) {
|
|
|
+ //缓存数据处理
|
|
|
+ partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1)
|
|
|
+ partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey)
|
|
|
+ }
|
|
|
)
|
|
|
log.Println("update Docin info task start :", date.NowFormat(date.Date_Full_Layout))
|
|
|
if endDate == "" {
|
|
|
- endDate = date.NowFormat(date.Date_yyyyMMdd)
|
|
|
+ endDate = time.Now().AddDate(0, 0, 1).Format(date.Date_yyyyMMdd)
|
|
|
}
|
|
|
if startDate == "" || endDate == "" || count == 0 || gconv.Int64(startDate) >= gconv.Int64(endDate) {
|
|
|
go warn.SendMsgByWXURL(fmt.Sprintf("timetask 保存 docin 文档失败--参数异常:startId:%d --startDate:%s --endDate:%s--count:%d", startId, startDate, endDate, count))
|
|
@@ -110,6 +125,7 @@ func UpdateDocinInfo(cron config.Cron) {
|
|
|
go warn.SendMsgByWXURL(fmt.Sprintf("更新豆丁数据到tidb失败:%s,当前任务执行参数:%v", err.Error(), cron))
|
|
|
} else if lastId > 0 {
|
|
|
cron.StartId = lastId
|
|
|
+ go setCache(cron)
|
|
|
//周期内数据未同步完成
|
|
|
if count == et {
|
|
|
if st := Cron.UpdateDocsList.SleepTime; st > 0 {
|
|
@@ -125,9 +141,35 @@ func UpdateDocinInfo(cron config.Cron) {
|
|
|
//缓存数据处理
|
|
|
partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1)
|
|
|
partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey)
|
|
|
- //休眠
|
|
|
- if st := Cron.UpdateDocsList.SleepTime; st > 0 {
|
|
|
- time.Sleep(time.Duration(st) * time.Millisecond)
|
|
|
- }
|
|
|
log.Println(fmt.Sprintf("update Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.UpdateExpectTotal, entity.UpdateActualTotal))
|
|
|
}
|
|
|
+
|
|
|
+// 文档分类文档数据量更新
|
|
|
+func GetDocsClassTask(ctx context.Context) {
|
|
|
+ defer common.Catch()
|
|
|
+ cron.CrontabByTicker(ctx, Cron.UpdateDocsClass, DocsClassUpdate)
|
|
|
+}
|
|
|
+
|
|
|
+func DocsClassUpdate(cron config.Cron) {
|
|
|
+ log.Println("update Docs class count task start :", date.NowFormat(date.Date_Full_Layout), cron)
|
|
|
+ //获取分类详情
|
|
|
+ if res := GetClassInfo(); len(res) > 0 {
|
|
|
+ var (
|
|
|
+ pool = make(chan bool, 6)
|
|
|
+ wg = &sync.WaitGroup{}
|
|
|
+ )
|
|
|
+ for _, cv := range res {
|
|
|
+ pool <- true
|
|
|
+ wg.Add(1)
|
|
|
+ go func(cv *Result) {
|
|
|
+ defer func() {
|
|
|
+ wg.Done()
|
|
|
+ <-pool
|
|
|
+ }()
|
|
|
+ UpdateDocsSize(cv)
|
|
|
+ }(cv)
|
|
|
+ }
|
|
|
+ wg.Wait()
|
|
|
+ }
|
|
|
+ log.Println(fmt.Sprintf("update Docs class count task end :%s", date.NowFormat(date.Date_Full_Layout)))
|
|
|
+}
|