main.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. package main
  2. import (
  3. elastic "app.yhyue.com/moapp/jybase/es"
  4. "context"
  5. "fmt"
  6. _ "github.com/gogf/gf/contrib/drivers/clickhouse/v2"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/os/gcron"
  9. "time"
  10. "workTasks/sendBidToSftp/cfile"
  11. "workTasks/sendBidToSftp/data"
  12. "workTasks/sendBidToSftp/send"
  13. )
  14. var (
  15. runJob = func(ctx context.Context, st, et time.Time) {
  16. //查询当日标讯数据
  17. var (
  18. fileNum int
  19. dataShow = st.Format(time.DateOnly)
  20. dir = fmt.Sprintf("out/%s", dataShow)
  21. )
  22. count, err := data.RangeBidding(ctx, st, et, func(maps []g.Map) {
  23. //生成csv文件
  24. if len(maps) <= 0 {
  25. return
  26. }
  27. err := cfile.CreateFile(ctx, maps, fmt.Sprintf("%s/%s_%03d.csv", dir, dataShow, fileNum+1))
  28. if err != nil {
  29. g.Log().Errorf(ctx, "生成文件异常 %v", err.Error())
  30. return
  31. }
  32. fileNum++
  33. })
  34. if err != nil {
  35. g.Log().Errorf(ctx, "获取 %s-%s 数据异常 %v", st.Format(time.DateTime), et.Format(time.DateTime), err)
  36. return
  37. }
  38. g.Log().Infof(ctx, "%s共有增量数据%d条 %d个文件", dataShow, count, fileNum)
  39. var (
  40. successNum int
  41. failFiles []string
  42. )
  43. if count > 0 && fileNum > 0 {
  44. successNum, failFiles = send.UpLoadFilesToSftp(ctx, dir)
  45. }
  46. g.Log().Infof(ctx, "%s上传文件 成功%d个 失败%d个", dataShow, successNum, len(failFiles))
  47. if qwRobotNotice := g.Cfg().MustGet(ctx, "qwRobotNotice").String(); qwRobotNotice != "" {
  48. var content string
  49. content = fmt.Sprintf("任务[%s]\n共%d条标讯数据\n生成%d个文件", dataShow, count, fileNum)
  50. if len(failFiles) > 0 {
  51. content += fmt.Sprintf("\n成功上传%d个文件,失败%d个文件\n失败文件列表:%v", successNum, len(failFiles), failFiles)
  52. } else {
  53. content += "\b全部上传完成"
  54. }
  55. g.Log().Infof(ctx, content)
  56. if err := send.SendSimpleMsg2ChatBot(content); err != nil {
  57. g.Log().Errorf(ctx, "发送企业微信消息异常 %s", err.Error())
  58. }
  59. }
  60. }
  61. )
  62. func main() {
  63. mainCtx := context.Background()
  64. fmt.Println(elastic.Count("bidding", "bidding", "{}"))
  65. //执行一次任务
  66. if g.Cfg().MustGet(mainCtx, "runOnce.isRun", false).Bool() {
  67. g.Log().Infof(mainCtx, "开始执行一次性任务")
  68. var (
  69. startDateStr = g.Cfg().MustGet(mainCtx, "runOnce.startDate").String()
  70. endDateStr = g.Cfg().MustGet(mainCtx, "runOnce.endDate").String()
  71. )
  72. st, st_err := time.ParseInLocation(time.DateTime, startDateStr, time.Local)
  73. et, et_err := time.ParseInLocation(time.DateTime, endDateStr, time.Local)
  74. if st_err != nil || et_err != nil {
  75. g.Log().Panicf(mainCtx, "开始执行时间异常%v %v", st_err, et_err)
  76. }
  77. runJob(mainCtx, st, et)
  78. g.Log().Infof(mainCtx, "一次性任务执行完成")
  79. return
  80. }
  81. //定时执行
  82. _, cronErr := gcron.AddSingleton(mainCtx, g.Cfg().MustGet(mainCtx, "runCron", "# # * * * *").String(), func(ctx context.Context) {
  83. g.Log().Infof(mainCtx, "开始执行定时任务")
  84. var (
  85. now = time.Now()
  86. timeEnd = time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  87. timeStart = timeEnd.AddDate(0, 0, -1)
  88. )
  89. runJob(ctx, timeStart, timeEnd)
  90. g.Log().Infof(mainCtx, "定时任务执行完成")
  91. })
  92. if cronErr != nil {
  93. g.Log().Panicf(mainCtx, "创建定时任务异常 %v", cronErr)
  94. }
  95. select {}
  96. }