main.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "log"
  6. . "online_syncto_offline/config"
  7. . "online_syncto_offline/db"
  8. . "online_syncto_offline/entity"
  9. "sync"
  10. "time"
  11. . "app.yhyue.com/moapp/jybase/common"
  12. . "app.yhyue.com/moapp/jybase/date"
  13. )
  14. var (
  15. incSyncMap = map[string]Entity{
  16. //jianyu
  17. "raw_user": Raw_user,
  18. "action_login": Action_login,
  19. "integral_flow": Integral_flow,
  20. "nps_user": Nps_user,
  21. "ali_pay": Ali_pay,
  22. "weixin_pay": Weixin_pay,
  23. "moneyCorrection": MoneyCorrection,
  24. //jyactivities
  25. "user_prize": User_prize,
  26. //bxt
  27. "bxt_raw_user": Bxt_raw_user,
  28. }
  29. fullSyncMap = map[string]Entity{}
  30. )
  31. func init() {
  32. for _, v := range Config.Jianyu {
  33. fullSyncMap["jianyu."+v] = NewFinalJob(v, Mysql_From_Jianyu, Mysql_To_Jianyu)
  34. }
  35. for _, v := range Config.Bxt {
  36. fullSyncMap["bxt."+v] = NewFinalJob(v, Mysql_From_Bxt, Mysql_To_Bxt)
  37. }
  38. for _, v := range Config.Jypoints {
  39. fullSyncMap["jypoints."+v] = NewFinalJob(v, Mysql_From_Jypoints, Mysql_To_Jianyu)
  40. }
  41. for _, v := range Config.Jyactivities {
  42. fullSyncMap["jyactivities."+v] = NewFinalJob(v, Mysql_From_Jyactivities, Mysql_To_Jianyu)
  43. }
  44. }
  45. func main() {
  46. m := flag.Int("m", 0, "模式 0:定时任务 1:非定时任务")
  47. t := flag.String("t", "", "表名")
  48. flag.Parse()
  49. inc := []Entity{}
  50. for _, v := range incSyncMap {
  51. inc = append(inc, v)
  52. }
  53. full := []Entity{}
  54. for _, v := range fullSyncMap {
  55. full = append(full, v)
  56. }
  57. if *m == 1 {
  58. if *t == "" {
  59. incSyncRun(inc...)
  60. fullSyncRun(full...)
  61. } else if incSyncMap[*t] != nil {
  62. incSyncRun(incSyncMap[*t])
  63. } else if fullSyncMap[*t] != nil {
  64. fullSyncRun(fullSyncMap[*t])
  65. } else {
  66. log.Fatalln(*t, "表的同步数据任务不存在")
  67. }
  68. } else {
  69. start(inc, full)
  70. select {}
  71. }
  72. }
  73. var isRuning bool
  74. func start(inc, full []Entity) {
  75. go func() {
  76. if isRuning {
  77. SendMail("同步数据任务上轮未结束,请排查!")
  78. return
  79. }
  80. isRuning = true
  81. if err := Mysql_To_Jianyu.DB.Ping(); err != nil {
  82. log.Println("Mysql_To_Jianyu Ping", err)
  83. Mysql_To_Jianyu.Init()
  84. }
  85. Mysql_To_Jianyu.ExecBySql(`SET session max_execution_time=86400`)
  86. if err := Mysql_To_Bxt.DB.Ping(); err != nil {
  87. log.Println("Mysql_To_Bxt Ping", err)
  88. Mysql_To_Bxt.Init()
  89. }
  90. Mysql_To_Bxt.ExecBySql(`SET session max_execution_time=86400`)
  91. incSyncRun(inc...)
  92. fullSyncRun(full...)
  93. isRuning = false
  94. }()
  95. now := time.Now()
  96. next := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
  97. next = next.Add(time.Minute * time.Duration(Config.Duration))
  98. time.AfterFunc(next.Sub(now), func() {
  99. start(inc, full)
  100. })
  101. }
  102. func fullSyncRun(es ...Entity) {
  103. log.Println("全量同步任务开始。。。", fmt.Sprintf("%+v", TimeTask))
  104. run("", "", es...)
  105. log.Println("全量同步任务结束。。。")
  106. }
  107. //
  108. func incSyncRun(es ...Entity) {
  109. end_layout := NowFormat(Date_Full_Layout)
  110. log.Println("增量同步任务开始。。。", fmt.Sprintf("%+v", TimeTask))
  111. run(TimeTask.Datetime, end_layout, es...)
  112. TimeTask.Datetime = end_layout
  113. WriteSysConfig("./timetask.json", &TimeTask)
  114. log.Println("增量同步任务结束。。。")
  115. }
  116. func run(d1, d2 string, es ...Entity) {
  117. wait := &sync.WaitGroup{}
  118. pool := make(chan bool, Config.SyncPool)
  119. for _, e := range es {
  120. pool <- true
  121. wait.Add(1)
  122. go func(v Entity) {
  123. defer func() {
  124. <-pool
  125. wait.Done()
  126. }()
  127. v.Run(d1, d2)
  128. }(e)
  129. }
  130. wait.Wait()
  131. }