package service import ( cron "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/crontab/handle" "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/entity" . "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/init" "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/internal/config" "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/service" "app.yhyue.com/moapp/jy_docs/rpc/partnerlib/warn" "app.yhyue.com/moapp/jy_docs/services/partner" "app.yhyue.com/moapp/jybase/common" "app.yhyue.com/moapp/jybase/date" "context" "fmt" "github.com/gogf/gf/v2/util/gconv" "log" "net/url" "sync" "time" ) // 获取豆丁docs基本信息 func GetDocinInfoTask(ctx context.Context) { defer common.Catch() cron.CrontabByTicker(ctx, Cron.NewDocsList, SyncDocinInfo) } func SyncDocinInfo(cron config.Cron) { log.Println("sync Docin info task start :", date.NowFormat(date.Date_Full_Layout)) var ( startId = cron.StartId h = service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.DocList.Name, I.Docin.DocList.Pathname, I.Docin.DocList.Method, url.Values{ "startId": []string{common.InterfaceToStr(startId)}, "count": []string{common.InterfaceToStr(Cron.NewDocsList.Count)}, }) b, err, _ = h.HttpFunc() lastId = startId setCache = func(cron config.Cron) { //缓存 保存最后一次更新id if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b { log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId) } else { log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId) } } ) go h.SaveDocinLogger(b, err, "req") if err == nil { if len(b) > 0 { var et, at int //豆丁同步数据到mongo go h.SaveDocinLogger(b, err, "res") err, lastId, et, at = InsertDocinInfos(b) entity.SyncExpectTotal = entity.SyncExpectTotal + et entity.SyncActualTotal = entity.SyncActualTotal + at if err != nil { go warn.SendMsgByWXURL(fmt.Sprintf("同步豆丁数据到tidb失败:%s,当前任务执行参数:%v", err.Error(), cron)) } else if lastId > 0 { //周期内数据未同步完成 cron.StartId = lastId setCache(cron) if cron.Count == et { if st := Cron.NewDocsList.SleepTime; st > 0 { time.Sleep(time.Duration(st) * time.Millisecond) } SyncDocinInfo(cron) } } } } else { go warn.SendMsgByWXURL(fmt.Sprintf("获取豆丁数据列表失败:%s,当前任务执行参数:%v", err.Error(), cron)) } //缓存 保存最后一次更新id if b := partner.SetDocsStartId(entity.RedisCode, cron.StartIdKey, cron.StartId, -1); !b { log.Println("同步 Docin 数据 更新最新id缓存异常: ", cron.StartId) } else { log.Println("同步 Docin 数据 更新最新id缓存成功: ", cron.StartId) } log.Println(fmt.Sprintf("sync Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.SyncExpectTotal, entity.SyncActualTotal)) } // 更新豆丁docs基本信息 func UpdateDocinInfoTask(ctx context.Context) { defer common.Catch() cron.CrontabByTicker(ctx, Cron.UpdateDocsList, UpdateDocinInfo) } func UpdateDocinInfo(cron config.Cron) { var ( startId = cron.StartId count = cron.Count startDate = cron.StartDate endDate = cron.EndDate setCache = func(cron config.Cron) { //缓存数据处理 partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1) partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey) } ) log.Println("update Docin info task start :", date.NowFormat(date.Date_Full_Layout)) if endDate == "" { endDate = time.Now().AddDate(0, 0, 1).Format(date.Date_yyyyMMdd) } if startDate == "" || endDate == "" || count == 0 || gconv.Int64(startDate) >= gconv.Int64(endDate) { go warn.SendMsgByWXURL(fmt.Sprintf("timetask 保存 docin 文档失败--参数异常:startId:%d --startDate:%s --endDate:%s--count:%d", startId, startDate, endDate, count)) return } h := service.NewHH(I.Docin.Name, I.Docin.Host, I.Docin.UpdateList.Name, I.Docin.UpdateList.Pathname, I.Docin.UpdateList.Method, url.Values{ "startId": []string{common.InterfaceToStr(startId)}, "count": []string{common.InterfaceToStr(count)}, "startDate": []string{startDate}, "endDate": []string{endDate}, }) b, err, _ := h.HttpFunc() lastId := startId go h.SaveDocinLogger(b, err, "req") if err == nil { if len(b) > 0 { var et, at int //豆丁更新数据到mongo go h.SaveDocinLogger(b, err, "res") err, lastId, et, at = UpdateDocinInfos(b) entity.UpdateExpectTotal = entity.UpdateExpectTotal + et entity.UpdateActualTotal = entity.UpdateActualTotal + at if err != nil { go warn.SendMsgByWXURL(fmt.Sprintf("更新豆丁数据到tidb失败:%s,当前任务执行参数:%v", err.Error(), cron)) } else if lastId > 0 { cron.StartId = lastId setCache(cron) //周期内数据未同步完成 if count == et { if st := Cron.UpdateDocsList.SleepTime; st > 0 { time.Sleep(time.Duration(st) * time.Millisecond) } UpdateDocinInfo(cron) } } } } else { go warn.SendMsgByWXURL(fmt.Sprintf("获取豆丁更新数据失败:%s,当前任务执行参数:%v", err.Error(), cron)) } //缓存数据处理 partner.SetUpdateTaskInfo(entity.RedisCode, cron.StartDateKey, endDate, -1) partner.DelDocsStartId(entity.RedisCode, cron.StartIdKey) log.Println(fmt.Sprintf("update Docin info task end :%s-当前任务更新 预计数据量:%d 实际数据量:%d", date.NowFormat(date.Date_Full_Layout), entity.UpdateExpectTotal, entity.UpdateActualTotal)) } // 文档分类文档数据量更新 func UpdateDocsClassTask(ctx context.Context) { defer common.Catch() cron.CrontabByTicker(ctx, Cron.UpdateDocsClass, DocsClassUpdate) } func DocsClassUpdate(cron config.Cron) { log.Println("update Docs class count task start :", date.NowFormat(date.Date_Full_Layout), cron) //获取分类详情 if res := GetClassInfo(); len(res) > 0 { var ( pool = make(chan bool, 6) wg = &sync.WaitGroup{} ) for _, cv := range res { pool <- true wg.Add(1) go func(cv *Result) { defer func() { wg.Done() <-pool }() UpdateDocsSize(cv) }(cv) } wg.Wait() } log.Println(fmt.Sprintf("update Docs class count task end :%s", date.NowFormat(date.Date_Full_Layout))) } // 合作商会员信息更新,每天一点开始 func UpdateDocsUserMemberTask(ctx context.Context) { defer common.Catch() cron.CrontabByTicker(ctx, Cron.UpdateDocsMember, DocsUserMemberUpdate) } func DocsUserMemberUpdate(cron config.Cron) { log.Println("update docs user member state task start :", date.NowFormat(date.Date_Full_Layout), cron) //直接更新合作商会员状态 if err := DocsUserMember(); err != nil { log.Println("合作商会员信息更新 定时任务异常: ", err.Error()) } log.Println(fmt.Sprintf("update docs user member state task end :%s", date.NowFormat(date.Date_Full_Layout))) }