package main import ( elastic "app.yhyue.com/moapp/jybase/es" "context" "fmt" _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcron" "time" "workTasks/sendBidToSftp/cfile" "workTasks/sendBidToSftp/data" "workTasks/sendBidToSftp/send" ) var ( runJob = func(ctx context.Context, st, et time.Time) { //查询当日标讯数据 var ( fileNum int dataShow = st.Format(time.DateOnly) dir = fmt.Sprintf("out/%s", dataShow) ) count, err := data.RangeBidding(ctx, st, et, func(maps []g.Map) { //生成csv文件 if len(maps) <= 0 { return } err := cfile.CreateFile(ctx, maps, fmt.Sprintf("%s/%s_%03d.csv", dir, dataShow, fileNum+1)) if err != nil { g.Log().Errorf(ctx, "生成文件异常 %v", err.Error()) return } fileNum++ }) if err != nil { g.Log().Errorf(ctx, "获取 %s-%s 数据异常 %v", st.Format(time.DateTime), et.Format(time.DateTime), err) return } g.Log().Infof(ctx, "%s共有增量数据%d条 %d个文件", dataShow, count, fileNum) var ( successNum int failFiles []string ) if count > 0 && fileNum > 0 { successNum, failFiles = send.UpLoadFilesToSftp(ctx, dir) } g.Log().Infof(ctx, "%s上传文件 成功%d个 失败%d个", dataShow, successNum, len(failFiles)) if qwRobotNotice := g.Cfg().MustGet(ctx, "qwRobotNotice").String(); qwRobotNotice != "" { var content string content = fmt.Sprintf("任务[%s]\n共%d条标讯数据\n生成%d个文件", dataShow, count, fileNum) if len(failFiles) > 0 { content += fmt.Sprintf("\n成功上传%d个文件,失败%d个文件\n失败文件列表:%v", successNum, len(failFiles), failFiles) } else { content += "\b全部上传完成" } g.Log().Infof(ctx, content) if err := send.SendSimpleMsg2ChatBot(content); err != nil { g.Log().Errorf(ctx, "发送企业微信消息异常 %s", err.Error()) } } } ) func main() { mainCtx := context.Background() fmt.Println(elastic.Count("bidding", "bidding", "{}")) //执行一次任务 if g.Cfg().MustGet(mainCtx, "runOnce.isRun", false).Bool() { g.Log().Infof(mainCtx, "开始执行一次性任务") var ( startDateStr = g.Cfg().MustGet(mainCtx, "runOnce.startDate").String() endDateStr = g.Cfg().MustGet(mainCtx, "runOnce.endDate").String() ) st, st_err := time.ParseInLocation(time.DateTime, startDateStr, time.Local) et, et_err := time.ParseInLocation(time.DateTime, endDateStr, time.Local) if st_err != nil || et_err != nil { g.Log().Panicf(mainCtx, "开始执行时间异常%v %v", st_err, et_err) } runJob(mainCtx, st, et) g.Log().Infof(mainCtx, "一次性任务执行完成") return } //定时执行 _, cronErr := gcron.AddSingleton(mainCtx, g.Cfg().MustGet(mainCtx, "runCron", "# # * * * *").String(), func(ctx context.Context) { g.Log().Infof(mainCtx, "开始执行定时任务") var ( now = time.Now() timeEnd = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) timeStart = timeEnd.AddDate(0, 0, -1) ) runJob(ctx, timeStart, timeEnd) g.Log().Infof(mainCtx, "定时任务执行完成") }) if cronErr != nil { g.Log().Panicf(mainCtx, "创建定时任务异常 %v", cronErr) } select {} }