package service import ( cron "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/crontab/handle" "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/entity" . "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/init" "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/internal/config" "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/service" "app.yhyue.com/moapp/jy_docs/services/partner" "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/date" "context" "fmt" "github.com/gogf/gf/v2/util/gconv" "log" "time" ) // 获取豆丁docs基本信息 func GetDocinInfoTask(ctx context.Context) { defer common.Catch() cron.CrontabByTicker(ctx, Cron.NewDocsList, SyncDocinInfo) } func SyncDocinInfo(cron config.Cron) { log.Println("sync Docin info task start :", date.NowFormat(date.Date_Full_Layout)) var ( startId = cron.StartId h = service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.DocList.Name, I.Docin.DocList.Pathname, I.Docin.DocList.Method, map[string]interface{}{ "startId": startId, "count": Cron.NewDocsList.Count, }) b, err = h.HttpFunc() lastId = startId ) go h.SaveRequestLogger(b, err) if err == nil { if len(b) > 0 { var et, at int //豆丁同步数据到mongo go h.SaveResponseData(b, err) err, lastId, et, at = InsertDocinInfos(b) entity.SyncExpectTotal = entity.SyncExpectTotal + et entity.SyncActualTotal = entity.SyncActualTotal + at if err != nil { log.Println("保存docin 文档失败:", err.Error()) //todo 发送异常信号 } else if lastId > 0 { //周期内数据未同步完成 cron.StartId = lastId if cron.Count == et { if st := Cron.NewDocsList.SleepTime; st > 0 { time.Sleep(time.Duration(st) * time.Millisecond) } SyncDocinInfo(cron) } } } } else { log.Println("获取doc文库列表失败:", err.Error()) //todo 发送异常信号 } //缓存 保存最后一次更新id if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b { log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId) } else { log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId) } log.Println(fmt.Sprintf("sync Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.SyncExpectTotal, entity.SyncActualTotal)) } // 更新豆丁docs基本信息 func UpdateDocinInfoTask(ctx context.Context) { defer common.Catch() cron.CrontabByTicker(ctx, Cron.UpdateDocsList, UpdateDocinInfo) } func UpdateDocinInfo(cron config.Cron) { var ( startId = cron.StartId count = cron.Count startDate = cron.StartDate endDate = cron.EndDate ) log.Println("update Docin info task start :", date.NowFormat(date.Date_Full_Layout)) if endDate == "" { endDate = date.NowFormat(date.Date_yyyyMMdd) } if startDate == "" || endDate == "" || count == 0 || gconv.Int64(startDate) >= gconv.Int64(endDate) { log.Println("timetask 保存 docin 文档失败--参数异常:startId:", startId, "--startDate:", startDate, "--endDate:", endDate, "--count:", count) //todo 发送异常信号 return } h := service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.UpdateList.Name, I.Docin.UpdateList.Pathname, I.Docin.UpdateList.Method, map[string]interface{}{ "startId": startId, "count": count, "startDate": startDate, "endDate": endDate, }) b, err := h.HttpFunc() lastId := startId go h.SaveRequestLogger(b, err) if err == nil { if len(b) > 0 { var et, at int //豆丁更新数据到mongo go h.SaveResponseData(b, err) err, lastId, et, at = UpdateDocinInfos(b) entity.UpdateExpectTotal = entity.UpdateExpectTotal + et entity.UpdateActualTotal = entity.UpdateActualTotal + at if err != nil { log.Println("更新 docin 文档失败:", err.Error()) //todo 发送异常信号 } else if lastId > 0 { cron.StartId = lastId //周期内数据未同步完成 if count == et { if st := Cron.UpdateDocsList.SleepTime; st > 0 { time.Sleep(time.Duration(st) * time.Millisecond) } UpdateDocinInfo(cron) } } } } else { log.Println("更新 doc文库列表失败:", err.Error()) //todo 发送异常信号 } //缓存数据处理 partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1) partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey) //休眠 if st := Cron.UpdateDocsList.SleepTime; st > 0 { time.Sleep(time.Duration(st) * time.Millisecond) } log.Println(fmt.Sprintf("update Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.UpdateExpectTotal, entity.UpdateActualTotal)) }