123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- package main
- import (
- elastic "app.yhyue.com/moapp/jybase/es"
- "context"
- "fmt"
- _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
- "github.com/gogf/gf/v2/frame/g"
- "github.com/gogf/gf/v2/os/gcron"
- "time"
- "workTasks/sendBidToSftp/cfile"
- "workTasks/sendBidToSftp/data"
- "workTasks/sendBidToSftp/send"
- )
- var (
- runJob = func(ctx context.Context, st, et time.Time) {
- //查询当日标讯数据
- var (
- fileNum int
- dataShow = st.Format(time.DateOnly)
- dir = fmt.Sprintf("out/%s", dataShow)
- )
- count, err := data.RangeBidding(ctx, st, et, func(maps []g.Map) {
- //生成csv文件
- if len(maps) <= 0 {
- return
- }
- err := cfile.CreateFile(ctx, maps, fmt.Sprintf("%s/%s_%03d.csv", dir, dataShow, fileNum+1))
- if err != nil {
- g.Log().Errorf(ctx, "生成文件异常 %v", err.Error())
- return
- }
- fileNum++
- })
- if err != nil {
- g.Log().Errorf(ctx, "获取 %s-%s 数据异常 %v", st.Format(time.DateTime), et.Format(time.DateTime), err)
- return
- }
- g.Log().Infof(ctx, "%s共有增量数据%d条 %d个文件", dataShow, count, fileNum)
- var (
- successNum int
- failFiles []string
- )
- if count > 0 && fileNum > 0 {
- successNum, failFiles = send.UpLoadFilesToSftp(ctx, dir)
- }
- g.Log().Infof(ctx, "%s上传文件 成功%d个 失败%d个", dataShow, successNum, len(failFiles))
- if qwRobotNotice := g.Cfg().MustGet(ctx, "qwRobotNotice").String(); qwRobotNotice != "" {
- var content string
- content = fmt.Sprintf("任务[%s]\n共%d条标讯数据\n生成%d个文件", dataShow, count, fileNum)
- if len(failFiles) > 0 {
- content += fmt.Sprintf("\n成功上传%d个文件,失败%d个文件\n失败文件列表:%v", successNum, len(failFiles), failFiles)
- } else {
- content += "\b全部上传完成"
- }
- g.Log().Infof(ctx, content)
- if err := send.SendSimpleMsg2ChatBot(content); err != nil {
- g.Log().Errorf(ctx, "发送企业微信消息异常 %s", err.Error())
- }
- }
- }
- )
- func main() {
- mainCtx := context.Background()
- fmt.Println(elastic.Count("bidding", "bidding", "{}"))
- //执行一次任务
- if g.Cfg().MustGet(mainCtx, "runOnce.isRun", false).Bool() {
- g.Log().Infof(mainCtx, "开始执行一次性任务")
- var (
- startDateStr = g.Cfg().MustGet(mainCtx, "runOnce.startDate").String()
- endDateStr = g.Cfg().MustGet(mainCtx, "runOnce.endDate").String()
- )
- st, st_err := time.ParseInLocation(time.DateTime, startDateStr, time.Local)
- et, et_err := time.ParseInLocation(time.DateTime, endDateStr, time.Local)
- if st_err != nil || et_err != nil {
- g.Log().Panicf(mainCtx, "开始执行时间异常%v %v", st_err, et_err)
- }
- runJob(mainCtx, st, et)
- g.Log().Infof(mainCtx, "一次性任务执行完成")
- return
- }
- //定时执行
- _, cronErr := gcron.AddSingleton(mainCtx, g.Cfg().MustGet(mainCtx, "runCron", "# # * * * *").String(), func(ctx context.Context) {
- g.Log().Infof(mainCtx, "开始执行定时任务")
- var (
- now = time.Now()
- timeEnd = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
- timeStart = timeEnd.AddDate(0, 0, -1)
- )
- runJob(ctx, timeStart, timeEnd)
- g.Log().Infof(mainCtx, "定时任务执行完成")
- })
- if cronErr != nil {
- g.Log().Panicf(mainCtx, "创建定时任务异常 %v", cronErr)
- }
- select {}
- }
|