123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- package main
- import (
- "context"
- "fmt"
- _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gcron"
- "github.com/gogf/gf/v2/os/gfile"
- "time"
- "workTasks/sendBidToSftp/cfile"
- "workTasks/sendBidToSftp/data"
- "workTasks/sendBidToSftp/send"
- )
- var (
- running = false
- dataFormat = "2006-01-02-15"
- flagFilePath = "last.txt"
- runJob = func(ctx context.Context, st, et time.Time) {
- if running {
- return
- }
- running = true
- defer func() {
- running = false
- }()
- //查询当日标讯数据
- var (
- dataShow = st.Format("20060102")
- fileName = st.Format("20060102_15")
- dir = fmt.Sprintf("out/%s", dataShow)
- fullName = fmt.Sprintf("%s/%s.csv", dir, fileName)
- )
- count, err := data.RangeBidding(ctx, st, et, func(maps []g.Map) {
- //生成csv文件
- if len(maps) <= 0 {
- return
- }
- err := cfile.CreateFile(ctx, maps, fullName)
- if err != nil {
- g.Log().Errorf(ctx, "生成文件异常 %v", err.Error())
- return
- }
- })
- if err != nil {
- g.Log().Errorf(ctx, "获取 %s-%s 数据异常 %v", st.Format(time.DateTime), et.Format(time.DateTime), err)
- return
- }
- g.Log().Infof(ctx, "%s共有增量数据%d条", dataShow, count)
- var (
- startDateStr = g.Cfg().MustGet(ctx, "sendTime.start").String()
- endDateStr = g.Cfg().MustGet(ctx, "sendTime.end").String()
- isSend = false
- now = time.Now()
- )
- ss, ss_err := time.ParseInLocation(time.DateTime, startDateStr, time.Local)
- se, se_err := time.ParseInLocation(time.DateTime, endDateStr, time.Local)
- if se_err != nil || ss_err != nil {
- g.Log().Errorf(ctx, "格式化发送时间异常 %v %v", ss_err, se_err)
- } else {
- if now.After(ss) && now.Before(se) {
- isSend = true
- }
- }
- g.Log().Infof(ctx, "isSend:%v", isSend)
- if isSend {
- sendFail := send.UpLoadFilesToSftp(ctx, fullName)
- if sendFail != nil {
- g.Log().Infof(ctx, "文件传输异常 %v", sendFail.Error())
- }
- if qwRobotNotice := g.Cfg().MustGet(ctx, "qwRobotNotice").String(); qwRobotNotice != "" {
- var content string
- content = fmt.Sprintf("任务[%s]\n共%d条标讯数据", dataShow, count)
- if sendFail != nil {
- content += fmt.Sprintf("\n成功上传失败:%v", sendFail)
- }
- g.Log().Infof(ctx, content)
- if err := send.SendSimpleMsg2ChatBot(content); err != nil {
- g.Log().Errorf(ctx, "发送企业微信消息异常 %s", err.Error())
- }
- }
- }
- }
- )
- func main() {
- mainCtx := context.Background()
- //执行一次任务
- if g.Cfg().MustGet(mainCtx, "runOnce.isRun", false).Bool() {
- g.Log().Infof(mainCtx, "开始执行一次性任务")
- var (
- startDateStr = g.Cfg().MustGet(mainCtx, "runOnce.startDate").String()
- endDateStr = g.Cfg().MustGet(mainCtx, "runOnce.endDate").String()
- )
- st, st_err := time.ParseInLocation(time.DateTime, startDateStr, time.Local)
- et, et_err := time.ParseInLocation(time.DateTime, endDateStr, time.Local)
- if st_err != nil || et_err != nil {
- g.Log().Panicf(mainCtx, "开始执行时间异常%v %v", st_err, et_err)
- }
- runJob(mainCtx, st, et)
- g.Log().Infof(mainCtx, "一次性任务执行完成")
- return
- }
- //定时执行
- _, cronErr := gcron.AddSingleton(mainCtx, g.Cfg().MustGet(mainCtx, "runCron", "# # * * * *").String(), func(ctx context.Context) {
- g.Log().Infof(mainCtx, "开始执行定时任务")
- var (
- dataStr = gfile.GetContents(flagFilePath)
- now = time.Now()
- timeEnd = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
- )
- timeStart, err := time.ParseInLocation(dataFormat, dataStr, time.Local)
- if err != nil {
- g.Log().Errorf(ctx, "加载日期异常 %v", err)
- return
- }
- runJob(ctx, timeStart, timeEnd)
- if err := gfile.PutContents("last.txt", timeEnd.Format(dataFormat)); err != nil {
- g.Log().Errorf(ctx, "存储时间异常 %v", err)
- }
- g.Log().Infof(mainCtx, "定时任务执行完成")
- })
- if cronErr != nil {
- g.Log().Panicf(mainCtx, "创建定时任务异常 %v", cronErr)
- }
- select {}
- }
|