123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- package main
- import (
- "flag"
- "fmt"
- "log"
- . "online_syncto_offline/config"
- . "online_syncto_offline/db"
- . "online_syncto_offline/entity"
- "sync"
- "time"
- . "app.yhyue.com/moapp/jybase/common"
- . "app.yhyue.com/moapp/jybase/date"
- )
- var (
- incSyncMap = map[string]Entity{
- //jianyu
- "raw_user": Raw_user,
- "action_login": Action_login,
- "integral_flow": Integral_flow,
- "nps_user": Nps_user,
- "ali_pay": Ali_pay,
- "weixin_pay": Weixin_pay,
- "moneyCorrection": MoneyCorrection,
- //jyactivities
- "user_prize": User_prize,
- //bxt
- "bxt_raw_user": Bxt_raw_user,
- }
- fullSyncMap = map[string]Entity{}
- )
- func init() {
- for _, v := range Config.Jianyu {
- fullSyncMap["jianyu."+v] = NewFinalJob(v, Mysql_From_Jianyu, Mysql_To_Jianyu)
- }
- for _, v := range Config.Bxt {
- fullSyncMap["bxt."+v] = NewFinalJob(v, Mysql_From_Bxt, Mysql_To_Bxt)
- }
- for _, v := range Config.Jypoints {
- fullSyncMap["jypoints."+v] = NewFinalJob(v, Mysql_From_Jypoints, Mysql_To_Jianyu)
- }
- for _, v := range Config.Jyactivities {
- fullSyncMap["jyactivities."+v] = NewFinalJob(v, Mysql_From_Jyactivities, Mysql_To_Jianyu)
- }
- }
- func main() {
- m := flag.Int("m", 0, "模式 0:定时任务 1:非定时任务")
- t := flag.String("t", "", "表名")
- flag.Parse()
- inc := []Entity{}
- for _, v := range incSyncMap {
- inc = append(inc, v)
- }
- full := []Entity{}
- for _, v := range fullSyncMap {
- full = append(full, v)
- }
- if *m == 1 {
- if *t == "" {
- incSyncRun(inc...)
- fullSyncRun(full...)
- } else if incSyncMap[*t] != nil {
- incSyncRun(incSyncMap[*t])
- } else if fullSyncMap[*t] != nil {
- fullSyncRun(fullSyncMap[*t])
- } else {
- log.Fatalln(*t, "表的同步数据任务不存在")
- }
- } else {
- start(inc, full)
- select {}
- }
- }
- var isRuning bool
- func start(inc, full []Entity) {
- go func() {
- if isRuning {
- SendMail("同步数据任务上轮未结束,请排查!")
- return
- }
- isRuning = true
- if err := Mysql_To_Jianyu.DB.Ping(); err != nil {
- log.Println("Mysql_To_Jianyu Ping", err)
- Mysql_To_Jianyu.Init()
- }
- Mysql_To_Jianyu.ExecBySql(`SET session max_execution_time=86400`)
- if err := Mysql_To_Bxt.DB.Ping(); err != nil {
- log.Println("Mysql_To_Bxt Ping", err)
- Mysql_To_Bxt.Init()
- }
- Mysql_To_Bxt.ExecBySql(`SET session max_execution_time=86400`)
- incSyncRun(inc...)
- fullSyncRun(full...)
- isRuning = false
- }()
- now := time.Now()
- next := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
- next = next.Add(time.Minute * time.Duration(Config.Duration))
- time.AfterFunc(next.Sub(now), func() {
- start(inc, full)
- })
- }
- func fullSyncRun(es ...Entity) {
- log.Println("全量同步任务开始。。。", fmt.Sprintf("%+v", TimeTask))
- run("", "", es...)
- log.Println("全量同步任务结束。。。")
- }
- //
- func incSyncRun(es ...Entity) {
- end_layout := NowFormat(Date_Full_Layout)
- log.Println("增量同步任务开始。。。", fmt.Sprintf("%+v", TimeTask))
- run(TimeTask.Datetime, end_layout, es...)
- TimeTask.Datetime = end_layout
- WriteSysConfig("./timetask.json", &TimeTask)
- log.Println("增量同步任务结束。。。")
- }
- func run(d1, d2 string, es ...Entity) {
- wait := &sync.WaitGroup{}
- pool := make(chan bool, Config.SyncPool)
- for _, e := range es {
- pool <- true
- wait.Add(1)
- go func(v Entity) {
- defer func() {
- <-pool
- wait.Done()
- }()
- v.Run(d1, d2)
- }(e)
- }
- wait.Wait()
- }
|