main.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package main
  2. import (
  3. . "data_analysis/config"
  4. . "data_analysis/db"
  5. . "data_analysis/entity"
  6. "flag"
  7. "fmt"
  8. "log"
  9. "sync"
  10. "time"
  11. . "app.yhyue.com/moapp/jybase/common"
  12. . "app.yhyue.com/moapp/jybase/date"
  13. )
  14. var (
  15. incSyncMap = map[string]Entity{
  16. //jianyu
  17. Raw_user.TableName(): Raw_user,
  18. Nps_user.TableName(): Nps_user,
  19. Ali_pay.TableName(): Ali_pay,
  20. Weixin_pay.TableName(): Weixin_pay,
  21. MoneyCorrection.TableName(): MoneyCorrection,
  22. Message_send_log.TableName(): Message_send_log,
  23. }
  24. fullSyncMap = map[string]Entity{}
  25. incNewSyncMap = map[string]Entity{}
  26. )
  27. func init() {
  28. for _, v := range Config.TableName {
  29. fullSyncMap[v] = NewFinalJob(v, "")
  30. }
  31. for _, v := range Config.IncTask.TableName {
  32. incNewSyncMap[v] = NewFinalJob(v, Config.IncTask.IncField)
  33. }
  34. }
  35. func main() {
  36. m := flag.Int("m", 0, "模式 0:定时任务 1:非定时任务")
  37. t := flag.String("t", "", "表名")
  38. flag.Parse()
  39. if *m == 1 {
  40. if *t == "" {
  41. incSyncRun(incSyncMap)
  42. fullSyncRun(fullSyncMap)
  43. incNewSyncRun(incNewSyncMap)
  44. } else if incSyncMap[*t] != nil {
  45. incSyncRun(map[string]Entity{
  46. *t: incSyncMap[*t],
  47. })
  48. } else if fullSyncMap[*t] != nil {
  49. fullSyncRun(map[string]Entity{
  50. *t: fullSyncMap[*t],
  51. })
  52. } else if incNewSyncMap[*t] != nil {
  53. incNewSyncRun(map[string]Entity{
  54. *t: incNewSyncMap[*t],
  55. })
  56. } else {
  57. log.Fatalln(*t, "表的同步数据任务不存在")
  58. }
  59. } else {
  60. start()
  61. startNew()
  62. select {}
  63. }
  64. }
  65. var isRuning bool
  66. func start() {
  67. go func() {
  68. if isRuning {
  69. SendMail("同步数据任务上轮未结束,请排查!")
  70. return
  71. }
  72. isRuning = true
  73. if err := Mysql_Main.DB.Ping(); err != nil {
  74. log.Println("Mysql_Main Ping", err)
  75. Mysql_Main.Init()
  76. }
  77. incSyncRun(incSyncMap)
  78. fullSyncRun(fullSyncMap)
  79. isRuning = false
  80. }()
  81. now := time.Now()
  82. next := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  83. next = next.Add(time.Minute * Config.Duration)
  84. time.AfterFunc(next.Sub(now), start)
  85. }
  86. func startNew() {
  87. if err := Mysql_Main.DB.Ping(); err != nil {
  88. log.Println("Mysql_Main Ping", err)
  89. Mysql_Main.Init()
  90. }
  91. incNewSyncRun(incNewSyncMap)
  92. time.AfterFunc(time.Minute*Config.IncTask.Duration, startNew)
  93. }
  94. func fullSyncRun(es map[string]Entity) {
  95. log.Println("全量同步任务开始。。。")
  96. run("", "", es)
  97. log.Println("全量同步任务结束。。。")
  98. }
  99. //
  100. func incSyncRun(es map[string]Entity) {
  101. end_layout := NowFormat(Date_Full_Layout)
  102. log.Println("增量同步任务开始。。。", fmt.Sprintf("%+v", TimeTask))
  103. run(TimeTask.Datetime, end_layout, es)
  104. TimeTask.Datetime = end_layout
  105. WriteSysConfig("./timetask.json", &TimeTask)
  106. log.Println("增量同步任务结束。。。")
  107. }
  108. //
  109. func incNewSyncRun(es map[string]Entity) {
  110. end_layout := NowFormat(Date_Full_Layout)
  111. log.Println("新的增量同步任务开始。。。", fmt.Sprintf("%+v", IncTask))
  112. run(IncTask.Datetime, end_layout, es)
  113. IncTask.Datetime = end_layout
  114. WriteSysConfig("./inctask.json", &IncTask)
  115. log.Println("新的增量同步任务结束。。。")
  116. }
  117. func run(d1, d2 string, es map[string]Entity) {
  118. wait := &sync.WaitGroup{}
  119. pool := make(chan bool, Config.SyncPool)
  120. for _, e := range es {
  121. pool <- true
  122. wait.Add(1)
  123. go func(v Entity) {
  124. defer func() {
  125. <-pool
  126. wait.Done()
  127. }()
  128. v.Run(d1, d2)
  129. }(e)
  130. }
  131. wait.Wait()
  132. }