main.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package main
  2. import (
  3. . "data_analysis/config"
  4. . "data_analysis/db"
  5. . "data_analysis/entity"
  6. "flag"
  7. "fmt"
  8. "log"
  9. "sync"
  10. "time"
  11. . "app.yhyue.com/moapp/jybase/common"
  12. . "app.yhyue.com/moapp/jybase/date"
  13. )
  14. var (
  15. incSyncMap = map[string]Entity{
  16. //jianyu
  17. Raw_user.TableName(): Raw_user,
  18. Nps_user.TableName(): Nps_user,
  19. Ali_pay.TableName(): Ali_pay,
  20. Weixin_pay.TableName(): Weixin_pay,
  21. MoneyCorrection.TableName(): MoneyCorrection,
  22. Message_send_log.TableName(): Message_send_log,
  23. }
  24. fullSyncMap = map[string]Entity{}
  25. )
  26. func init() {
  27. for _, v := range Config.Jianyu {
  28. fullSyncMap["jianyu."+v] = NewFinalJob("jianyu." + v)
  29. }
  30. }
  31. func main() {
  32. m := flag.Int("m", 0, "模式 0:定时任务 1:非定时任务")
  33. t := flag.String("t", "", "表名")
  34. flag.Parse()
  35. inc := []Entity{}
  36. for _, v := range incSyncMap {
  37. inc = append(inc, v)
  38. }
  39. full := []Entity{}
  40. for _, v := range fullSyncMap {
  41. full = append(full, v)
  42. }
  43. if *m == 1 {
  44. if *t == "" {
  45. incSyncRun(inc...)
  46. fullSyncRun(full...)
  47. } else if incSyncMap[*t] != nil {
  48. incSyncRun(incSyncMap[*t])
  49. } else if fullSyncMap[*t] != nil {
  50. fullSyncRun(fullSyncMap[*t])
  51. } else {
  52. log.Fatalln(*t, "表的同步数据任务不存在")
  53. }
  54. } else {
  55. start(inc, full)
  56. select {}
  57. }
  58. }
  59. var isRuning bool
  60. func start(inc, full []Entity) {
  61. go func() {
  62. if isRuning {
  63. SendMail("同步数据任务上轮未结束,请排查!")
  64. return
  65. }
  66. isRuning = true
  67. if err := Mysql_Main.DB.Ping(); err != nil {
  68. log.Println("Mysql_Main Ping", err)
  69. Mysql_Main.Init()
  70. }
  71. Mysql_Main.ExecBySql(`SET session max_execution_time=86400`)
  72. incSyncRun(inc...)
  73. fullSyncRun(full...)
  74. isRuning = false
  75. }()
  76. now := time.Now()
  77. next := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  78. next = next.Add(time.Minute * time.Duration(Config.Duration))
  79. time.AfterFunc(next.Sub(now), func() {
  80. start(inc, full)
  81. })
  82. }
  83. func fullSyncRun(es ...Entity) {
  84. log.Println("全量同步任务开始。。。")
  85. run("", "", es...)
  86. log.Println("全量同步任务结束。。。")
  87. }
  88. //
  89. func incSyncRun(es ...Entity) {
  90. end_layout := NowFormat(Date_Full_Layout)
  91. log.Println("增量同步任务开始。。。", fmt.Sprintf("%+v", TimeTask))
  92. run(TimeTask.Datetime, end_layout, es...)
  93. TimeTask.Datetime = end_layout
  94. WriteSysConfig("./timetask.json", &TimeTask)
  95. log.Println("增量同步任务结束。。。")
  96. }
  97. func run(d1, d2 string, es ...Entity) {
  98. wait := &sync.WaitGroup{}
  99. pool := make(chan bool, Config.SyncPool)
  100. for _, e := range es {
  101. pool <- true
  102. wait.Add(1)
  103. go func(v Entity) {
  104. defer func() {
  105. <-pool
  106. wait.Done()
  107. }()
  108. v.Run(d1, d2)
  109. }(e)
  110. }
  111. wait.Wait()
  112. }