package main import ( "context" "fmt" _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/gcron" "github.com/gogf/gf/v2/os/gfile" "time" "workTasks/sendBidToSftp/cfile" "workTasks/sendBidToSftp/data" "workTasks/sendBidToSftp/send" ) var ( running = false dataFormat = "2006-01-02-15" flagFilePath = "last.txt" runJob = func(ctx context.Context, st, et time.Time) { if running { return } running = true defer func() { running = false }() //查询当日标讯数据 var ( dataShow = st.Format("20060102") fileName = st.Format("20060102_15") dir = fmt.Sprintf("out/%s", dataShow) fullName = fmt.Sprintf("%s/%s.csv", dir, fileName) ) count, err := data.RangeBidding(ctx, st, et, func(maps []g.Map) { //生成csv文件 if len(maps) <= 0 { return } err := cfile.CreateFile(ctx, maps, fullName) if err != nil { g.Log().Errorf(ctx, "生成文件异常 %v", err.Error()) return } }) if err != nil { g.Log().Errorf(ctx, "获取 %s-%s 数据异常 %v", st.Format(time.DateTime), et.Format(time.DateTime), err) return } g.Log().Infof(ctx, "%s共有增量数据%d条", dataShow, count) var ( startDateStr = g.Cfg().MustGet(ctx, "sendTime.start").String() endDateStr = g.Cfg().MustGet(ctx, "sendTime.end").String() isSend = false now = time.Now() ) ss, ss_err := time.ParseInLocation(time.DateTime, startDateStr, time.Local) se, se_err := time.ParseInLocation(time.DateTime, endDateStr, time.Local) if se_err != nil || ss_err != nil { g.Log().Errorf(ctx, "格式化发送时间异常 %v %v", ss_err, se_err) } else { if now.After(ss) && now.Before(se) { isSend = true } } g.Log().Infof(ctx, "isSend:%v", isSend) if isSend { sendFail := send.UpLoadFilesToSftp(ctx, fullName) if sendFail != nil { g.Log().Infof(ctx, "文件传输异常 %v", sendFail.Error()) } if qwRobotNotice := g.Cfg().MustGet(ctx, "qwRobotNotice").String(); qwRobotNotice != "" { var content string content = fmt.Sprintf("任务[%s]\n共%d条标讯数据", dataShow, count) if sendFail != nil { content += fmt.Sprintf("\n成功上传失败:%v", sendFail) } g.Log().Infof(ctx, content) if err := send.SendSimpleMsg2ChatBot(content); err != nil { g.Log().Errorf(ctx, "发送企业微信消息异常 %s", err.Error()) } } } } ) func main() { mainCtx := context.Background() //执行一次任务 if g.Cfg().MustGet(mainCtx, "runOnce.isRun", false).Bool() { g.Log().Infof(mainCtx, "开始执行一次性任务") var ( startDateStr = g.Cfg().MustGet(mainCtx, "runOnce.startDate").String() endDateStr = g.Cfg().MustGet(mainCtx, "runOnce.endDate").String() ) st, st_err := time.ParseInLocation(time.DateTime, startDateStr, time.Local) et, et_err := time.ParseInLocation(time.DateTime, endDateStr, time.Local) if st_err != nil || et_err != nil { g.Log().Panicf(mainCtx, "开始执行时间异常%v %v", st_err, et_err) } runJob(mainCtx, st, et) g.Log().Infof(mainCtx, "一次性任务执行完成") return } //定时执行 _, cronErr := gcron.AddSingleton(mainCtx, g.Cfg().MustGet(mainCtx, "runCron", "# # * * * *").String(), func(ctx context.Context) { g.Log().Infof(mainCtx, "开始执行定时任务") var ( dataStr = gfile.GetContents(flagFilePath) now = time.Now() timeEnd = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local) ) timeStart, err := time.ParseInLocation(dataFormat, dataStr, time.Local) if err != nil { g.Log().Errorf(ctx, "加载日期异常 %v", err) return } runJob(ctx, timeStart, timeEnd) if err := gfile.PutContents("last.txt", timeEnd.Format(dataFormat)); err != nil { g.Log().Errorf(ctx, "存储时间异常 %v", err) } g.Log().Infof(mainCtx, "定时任务执行完成") }) if cronErr != nil { g.Log().Panicf(mainCtx, "创建定时任务异常 %v", cronErr) } select {} }