123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package main
- import (
- . "data_analysis/config"
- . "data_analysis/db"
- . "data_analysis/entity"
- "flag"
- "fmt"
- "log"
- "sync"
- "time"
- . "app.yhyue.com/moapp/jybase/common"
- . "app.yhyue.com/moapp/jybase/date"
- )
- var (
- incSyncMap = map[string]Entity{
- //jianyu
- Raw_user.TableName(): Raw_user,
- Nps_user.TableName(): Nps_user,
- Ali_pay.TableName(): Ali_pay,
- Weixin_pay.TableName(): Weixin_pay,
- MoneyCorrection.TableName(): MoneyCorrection,
- Message_send_log.TableName(): Message_send_log,
- }
- fullSyncMap = map[string]Entity{}
- )
- func init() {
- for _, v := range Config.Jianyu {
- fullSyncMap["jianyu."+v] = NewFinalJob("jianyu." + v)
- }
- }
- func main() {
- m := flag.Int("m", 0, "模式 0:定时任务 1:非定时任务")
- t := flag.String("t", "", "表名")
- flag.Parse()
- inc := []Entity{}
- for _, v := range incSyncMap {
- inc = append(inc, v)
- }
- full := []Entity{}
- for _, v := range fullSyncMap {
- full = append(full, v)
- }
- if *m == 1 {
- if *t == "" {
- incSyncRun(inc...)
- fullSyncRun(full...)
- } else if incSyncMap[*t] != nil {
- incSyncRun(incSyncMap[*t])
- } else if fullSyncMap[*t] != nil {
- fullSyncRun(fullSyncMap[*t])
- } else {
- log.Fatalln(*t, "表的同步数据任务不存在")
- }
- } else {
- start(inc, full)
- select {}
- }
- }
- var isRuning bool
- func start(inc, full []Entity) {
- go func() {
- if isRuning {
- SendMail("同步数据任务上轮未结束,请排查!")
- return
- }
- isRuning = true
- if err := Mysql_Main.DB.Ping(); err != nil {
- log.Println("Mysql_Main Ping", err)
- Mysql_Main.Init()
- }
- Mysql_Main.ExecBySql(`SET session max_execution_time=86400`)
- incSyncRun(inc...)
- fullSyncRun(full...)
- isRuning = false
- }()
- now := time.Now()
- next := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0, 0, time.Local)
- next = next.Add(time.Minute * time.Duration(Config.Duration))
- time.AfterFunc(next.Sub(now), func() {
- start(inc, full)
- })
- }
- func fullSyncRun(es ...Entity) {
- log.Println("全量同步任务开始。。。")
- run("", "", es...)
- log.Println("全量同步任务结束。。。")
- }
- //
- func incSyncRun(es ...Entity) {
- end_layout := NowFormat(Date_Full_Layout)
- log.Println("增量同步任务开始。。。", fmt.Sprintf("%+v", TimeTask))
- run(TimeTask.Datetime, end_layout, es...)
- TimeTask.Datetime = end_layout
- WriteSysConfig("./timetask.json", &TimeTask)
- log.Println("增量同步任务结束。。。")
- }
- func run(d1, d2 string, es ...Entity) {
- wait := &sync.WaitGroup{}
- pool := make(chan bool, Config.SyncPool)
- for _, e := range es {
- pool <- true
- wait.Add(1)
- go func(v Entity) {
- defer func() {
- <-pool
- wait.Done()
- }()
- v.Run(d1, d2)
- }(e)
- }
- wait.Wait()
- }
|