|
@@ -2,13 +2,18 @@ package entity
|
|
|
|
|
|
import (
|
|
import (
|
|
. "data_analysis/config"
|
|
. "data_analysis/config"
|
|
|
|
+ . "data_analysis/db"
|
|
"fmt"
|
|
"fmt"
|
|
"log"
|
|
"log"
|
|
|
|
+ "net/http"
|
|
"strings"
|
|
"strings"
|
|
"time"
|
|
"time"
|
|
|
|
|
|
. "app.yhyue.com/moapp/jybase/common"
|
|
. "app.yhyue.com/moapp/jybase/common"
|
|
- "app.yhyue.com/moapp/jybase/mysql"
|
|
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+const (
|
|
|
|
+ ToDbName = "data_analysis"
|
|
)
|
|
)
|
|
|
|
|
|
type Entity interface {
|
|
type Entity interface {
|
|
@@ -18,18 +23,19 @@ type Entity interface {
|
|
}
|
|
}
|
|
|
|
|
|
//
|
|
//
|
|
-func sync_add(main, to *mysql.Mysql, tableName string, saveField []string, id int64, f func(fv map[string]interface{})) (lastId int64) {
|
|
|
|
- log.Println("开始同步", tableName, "表。。。")
|
|
|
|
|
|
+func sync_add(fromTable string, saveField []string, id int64, f func(fv map[string]interface{})) (lastId int64) {
|
|
|
|
+ log.Println("开始同步", fromTable, "表。。。")
|
|
|
|
+ toTable := GetToTable(fromTable)
|
|
index := 0
|
|
index := 0
|
|
array := []interface{}{}
|
|
array := []interface{}{}
|
|
lastId = id
|
|
lastId = id
|
|
q := ""
|
|
q := ""
|
|
if id < 0 {
|
|
if id < 0 {
|
|
- q = fmt.Sprintf(`select * from %s`, tableName)
|
|
|
|
|
|
+ q = fmt.Sprintf(`select * from %s`, fromTable)
|
|
} else {
|
|
} else {
|
|
- q = fmt.Sprintf(`select * from %s where id>%d order by id`, tableName, id)
|
|
|
|
|
|
+ q = fmt.Sprintf(`select * from %s where id>%d order by id`, fromTable, id)
|
|
}
|
|
}
|
|
- main.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
|
|
|
|
|
|
+ Mysql_Main.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
|
|
for _, v := range *l {
|
|
for _, v := range *l {
|
|
index++
|
|
index++
|
|
lastId = Int64All(v["id"])
|
|
lastId = Int64All(v["id"])
|
|
@@ -40,26 +46,27 @@ func sync_add(main, to *mysql.Mysql, tableName string, saveField []string, id in
|
|
array = append(array, v[field])
|
|
array = append(array, v[field])
|
|
}
|
|
}
|
|
if index%Config.InsertBathSize == 0 {
|
|
if index%Config.InsertBathSize == 0 {
|
|
- log.Println("同步", tableName, "表", index)
|
|
|
|
- to.InsertIgnoreBatch(tableName, saveField, array)
|
|
|
|
|
|
+ log.Println("同步", fromTable, "表", index)
|
|
|
|
+ Mysql_Main.InsertBatch(toTable, saveField, array)
|
|
array = []interface{}{}
|
|
array = []interface{}{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}, q)
|
|
}, q)
|
|
if len(array) > 0 {
|
|
if len(array) > 0 {
|
|
- to.InsertIgnoreBatch(tableName, saveField, array)
|
|
|
|
|
|
+ Mysql_Main.InsertBatch(toTable, saveField, array)
|
|
array = []interface{}{}
|
|
array = []interface{}{}
|
|
}
|
|
}
|
|
- log.Println("同步", tableName, "表结束。。。", index)
|
|
|
|
|
|
+ log.Println("同步", fromTable, "表结束。。。", index)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
//
|
|
//
|
|
-func sync_update(main, to *mysql.Mysql, tableName, start_layout, end_layout string, saveFields []string, id int64, updateTimeField string) {
|
|
|
|
- log.Println("开始同步更新", tableName, "表 。。。")
|
|
|
|
|
|
+func sync_update(fromTable, start_layout, end_layout string, saveFields []string, id int64, updateTimeField string) {
|
|
|
|
+ toTable := GetToTable(fromTable)
|
|
|
|
+ log.Println("开始同步更新", toTable, "表 。。。")
|
|
index := 0
|
|
index := 0
|
|
array := [][]interface{}{}
|
|
array := [][]interface{}{}
|
|
- main.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
|
|
|
|
|
|
+ Mysql_Main.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
|
|
for _, v := range *l {
|
|
for _, v := range *l {
|
|
index++
|
|
index++
|
|
datas := []interface{}{}
|
|
datas := []interface{}{}
|
|
@@ -68,68 +75,90 @@ func sync_update(main, to *mysql.Mysql, tableName, start_layout, end_layout stri
|
|
}
|
|
}
|
|
array = append(array, datas)
|
|
array = append(array, datas)
|
|
if index%Config.UpdateBathSize == 0 {
|
|
if index%Config.UpdateBathSize == 0 {
|
|
- log.Println("同步更新", tableName, "表", index)
|
|
|
|
- to.UpdateBath(tableName, saveFields, array)
|
|
|
|
|
|
+ log.Println("同步更新", toTable, "表", index)
|
|
|
|
+ Mysql_Main.UpdateBath(toTable, saveFields, array)
|
|
array = [][]interface{}{}
|
|
array = [][]interface{}{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }, fmt.Sprintf(`select * from %s where id<? and `+updateTimeField+`>=? and `+updateTimeField+`<?`, tableName), id, start_layout, end_layout)
|
|
|
|
|
|
+ }, fmt.Sprintf(`select * from %s where id<? and `+updateTimeField+`>=? and `+updateTimeField+`<?`, fromTable), id, start_layout, end_layout)
|
|
if len(array) > 0 {
|
|
if len(array) > 0 {
|
|
- to.UpdateBath(tableName, saveFields, array)
|
|
|
|
|
|
+ Mysql_Main.UpdateBath(toTable, saveFields, array)
|
|
array = [][]interface{}{}
|
|
array = [][]interface{}{}
|
|
}
|
|
}
|
|
- log.Println("同步更新", tableName, "表结束。。。", index)
|
|
|
|
|
|
+ log.Println("同步更新", toTable, "表结束。。。", index)
|
|
}
|
|
}
|
|
|
|
|
|
//全量同步
|
|
//全量同步
|
|
-func sync_full(e Entity, from, to *mysql.Mysql) bool {
|
|
|
|
- if to.Address != "172.17.145.164:14000" || to.DBName != "data_analysis" {
|
|
|
|
- log.Println("数据库配置错误")
|
|
|
|
|
|
+func sync_full(e Entity) bool {
|
|
|
|
+ if Mysql_Main.Address != "172.17.145.164:14000" || Mysql_Main.DBName != ToDbName {
|
|
|
|
+ log.Fatalln("数据库配置错误")
|
|
return false
|
|
return false
|
|
}
|
|
}
|
|
- list := from.SelectBySql(`SHOW CREATE TABLE ` + e.TableName())
|
|
|
|
|
|
+ toTable := GetToTable(e.TableName())
|
|
|
|
+ list := Mysql_Main.SelectBySql(`SHOW CREATE TABLE ` + e.TableName())
|
|
if list != nil && len(*list) == 1 {
|
|
if list != nil && len(*list) == 1 {
|
|
- if _, err := to.DB.Exec(`drop table ` + e.TableName()); err != nil {
|
|
|
|
- log.Println(e.TableName(), "清空表出错", err)
|
|
|
|
|
|
+ if _, err := Mysql_Main.DB.Exec(`drop table ` + toTable); err != nil {
|
|
|
|
+ log.Println(toTable, "清空表出错", err)
|
|
return false
|
|
return false
|
|
} else {
|
|
} else {
|
|
- log.Println(e.TableName(), "清空表完成")
|
|
|
|
|
|
+ log.Println(toTable, "清空表完成")
|
|
}
|
|
}
|
|
createTable := (*list)[0]["Create Table"].(string)
|
|
createTable := (*list)[0]["Create Table"].(string)
|
|
- log.Println(e.TableName(), "建表语句", createTable)
|
|
|
|
- if _, err := to.DB.Exec(createTable); err != nil {
|
|
|
|
- log.Println(e.TableName(), "创建表错误", err)
|
|
|
|
|
|
+ log.Println(toTable, "建表语句", createTable)
|
|
|
|
+ if _, err := Mysql_Main.DB.Exec(createTable); err != nil {
|
|
|
|
+ log.Println(toTable, "创建表错误", err)
|
|
return false
|
|
return false
|
|
} else {
|
|
} else {
|
|
- log.Println(e.TableName(), "表创建成功")
|
|
|
|
|
|
+ log.Println(toTable, "表创建成功")
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
log.Println("没有", e.TableName(), "的建表语句")
|
|
log.Println("没有", e.TableName(), "的建表语句")
|
|
return false
|
|
return false
|
|
}
|
|
}
|
|
- array := from.SelectBySql(`select id from ` + e.TableName() + ` order by id desc limit 1`)
|
|
|
|
|
|
+ array := Mysql_Main.SelectBySql(`select id from ` + e.TableName() + ` order by id desc limit 1`)
|
|
if array == nil || len(*array) == 0 {
|
|
if array == nil || len(*array) == 0 {
|
|
log.Println(e.TableName(), "表没有数据")
|
|
log.Println(e.TableName(), "表没有数据")
|
|
return false
|
|
return false
|
|
}
|
|
}
|
|
lastId := (*array)[0]["id"].(int64)
|
|
lastId := (*array)[0]["id"].(int64)
|
|
- fields := getFields(from, e.TableName())
|
|
|
|
|
|
+ fields := getFields(strings.Split(e.TableName(), ".")[0], strings.Split(e.TableName(), ".")[1])
|
|
|
|
+ var startId int64 = -1
|
|
|
|
+ sleepMin := 1
|
|
for {
|
|
for {
|
|
- id := sync_add(from, to, e.TableName(), fields, -1, nil)
|
|
|
|
|
|
+ id := sync_add(e.TableName(), fields, startId, nil)
|
|
if id >= lastId {
|
|
if id >= lastId {
|
|
break
|
|
break
|
|
}
|
|
}
|
|
- log.Println("同步", e.TableName(), "数量不够,一分钟后重试")
|
|
|
|
- time.Sleep(time.Minute)
|
|
|
|
|
|
+ startId = id
|
|
|
|
+ if sleepMin > 3 {
|
|
|
|
+ SendMail("重试5次,全量同步" + e.TableName() + "失败,请排查")
|
|
|
|
+ break
|
|
|
|
+ }
|
|
|
|
+ slm := time.Duration(sleepMin) * 2 * time.Minute
|
|
|
|
+ SendMail("全量同步" + e.TableName() + "数量不够," + fmt.Sprint(slm.Minutes()) + "分钟后重试")
|
|
|
|
+ time.Sleep(slm)
|
|
|
|
+ sleepMin++
|
|
}
|
|
}
|
|
return true
|
|
return true
|
|
}
|
|
}
|
|
|
|
|
|
-func getFields(m *mysql.Mysql, tableName string) []string {
|
|
|
|
- list := m.SelectBySql(`select group_concat(COLUMN_NAME separator ',') as fields from information_schema.COLUMNS where table_schema=? and table_name=?`, m.DBName, tableName)
|
|
|
|
|
|
+func getFields(dbName, tableName string) []string {
|
|
|
|
+ list := Mysql_Main.SelectBySql(`select group_concat(COLUMN_NAME separator ',') as fields from information_schema.COLUMNS where table_schema=? and table_name=?`, dbName, tableName)
|
|
if list == nil || len(*list) == 0 || (*list)[0]["fields"] == nil {
|
|
if list == nil || len(*list) == 0 || (*list)[0]["fields"] == nil {
|
|
- log.Println("没有找到", m.DBName, "库", tableName, "表的字段")
|
|
|
|
|
|
+ log.Println("没有找到", dbName, "库", tableName, "表的字段")
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|
|
return strings.Split((*list)[0]["fields"].(string), ",")
|
|
return strings.Split((*list)[0]["fields"].(string), ",")
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+func SendMail(msg string) {
|
|
|
|
+ log.Println(msg)
|
|
|
|
+ if _, err := http.Get(fmt.Sprintf(Config.TimeoutWarn, msg)); err != nil {
|
|
|
|
+ log.Println(msg, "发送告警邮件异常", err)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//
|
|
|
|
+func GetToTable(fromTable string) string {
|
|
|
|
+ return ToDbName + "." + strings.Split(fromTable, ".")[1]
|
|
|
|
+}
|