wangchuanjin 4 年之前
父節點
當前提交
e9bedadf8f

+ 3 - 0
online_datasync/config/config.go

@@ -15,6 +15,9 @@ type timeTask struct {
 	User_mgo_mysql_id   string `json:"user_mgo_mysql_id"`
 	Raw_product_id      int64  `json:"raw_product_id"`
 	Dataexport_order_id int64  `json:"dataexport_order_id"`
+	Point_type_id       int64  `json:"point_type_id"`
+	Integral_flow_id    int64  `json:"integral_flow_id"`
+	User_prize_id       int64  `json:"user_prize_id"`
 }
 
 var (

+ 8 - 0
online_datasync/db.json

@@ -31,6 +31,14 @@
 	        "passWord": "Topnet123",
 			"maxOpenConns": 5,
 			"maxIdleConns": 5
+	    },
+		"from_jypoints": {
+	        "dbName": "jypoints",
+	        "address": "192.168.3.11:3366",
+	        "userName": "root",
+	        "passWord": "Topnet123",
+			"maxOpenConns": 5,
+			"maxIdleConns": 5
 	    }
     }
 }

+ 14 - 1
online_datasync/db/db.go

@@ -12,8 +12,8 @@ var (
 	DbConf                  *dbConf
 	Mysql_Main              *mysql.Mysql
 	Mysql_From_Jianyu       *mysql.Mysql
+	Mysql_From_Jypoints     *mysql.Mysql
 	Mysql_From_Jyactivities *mysql.Mysql
-	Mysql_To                *mysql.Mysql
 	Mgo                     MongodbSim
 )
 
@@ -25,6 +25,7 @@ type dbConf struct {
 		Main              *mysqlConf
 		From_jianyu       *mysqlConf
 		From_jyactivities *mysqlConf
+		From_jypoints     *mysqlConf
 	}
 }
 type mgoConf struct {
@@ -94,5 +95,17 @@ func init() {
 			}
 			Mysql_From_Jyactivities.Init()
 		}
+		if DbConf.Mysql.From_jypoints != nil {
+			log.Println("初始化 mysql from jypoints")
+			Mysql_From_Jypoints = &mysql.Mysql{
+				Address:      DbConf.Mysql.From_jypoints.Address,
+				UserName:     DbConf.Mysql.From_jypoints.UserName,
+				PassWord:     DbConf.Mysql.From_jypoints.PassWord,
+				DBName:       DbConf.Mysql.From_jypoints.DbName,
+				MaxOpenConns: DbConf.Mysql.From_jypoints.MaxOpenConns,
+				MaxIdleConns: DbConf.Mysql.From_jypoints.MaxIdleConns,
+			}
+			Mysql_From_Jypoints.Init()
+		}
 	}
 }

+ 38 - 0
online_datasync/entity/entity.go

@@ -1,7 +1,45 @@
 package entity
 
+import (
+	"fmt"
+	"log"
+	. "online_datasync/config"
+	. "online_datasync/db"
+
+	. "app.yhyue.com/moapp/jybase/common"
+	"app.yhyue.com/moapp/jybase/mysql"
+)
+
 type Entity interface {
 	Run(start_unix, end_unix int64, start_layout, end_layout string)
 	TableName() string
 	SaveFields() []string
 }
+
+//
+func sync_add(m *mysql.Mysql, tableName string, saveField []string, id int64, f func(fv map[string]interface{})) (lastId int64) {
+	log.Println("开始同步", tableName, "表。。。")
+	index := 0
+	array := []interface{}{}
+	lastId = id
+	m.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
+		for _, v := range *l {
+			index++
+			lastId = Int64All(v["id"])
+			for _, field := range saveField {
+				array = append(array, v[field])
+			}
+			if index%Config.InsertBathSize == 0 {
+				log.Println("同步", tableName, "表", index)
+				Mysql_Main.InsertIgnoreBatch(tableName, saveField, array)
+				array = []interface{}{}
+			}
+		}
+	}, fmt.Sprintf(`select * from %s where id>? order by id`, tableName), id)
+	if len(array) > 0 {
+		Mysql_Main.InsertIgnoreBatch(tableName, saveField, array)
+		array = []interface{}{}
+	}
+	log.Println("同步", tableName, "表结束。。。", index)
+	return
+}

+ 27 - 0
online_datasync/entity/integral_flow.go

@@ -0,0 +1,27 @@
+package entity
+
+import (
+	. "online_datasync/config"
+	. "online_datasync/db"
+)
+
+var (
+	Integral_flow *integral_flow
+)
+
+type integral_flow struct {
+}
+
+func (i *integral_flow) TableName() string {
+	return "integral_flow"
+}
+
+//
+func (i *integral_flow) SaveFields() []string {
+	return []string{"id", "userId", "pointType", "SourceId", "sourceType", "point", "createTime", "endDate", "appId", "sort", "abstract", "operationType", "serialNumber"}
+}
+
+//
+func (i *integral_flow) Run(start_unix, end_unix int64, start_layout, end_layout string) {
+	TimeTask.Integral_flow_id = sync_add(Mysql_From_Jypoints, i.TableName(), i.SaveFields(), TimeTask.Integral_flow_id, nil)
+}

+ 27 - 0
online_datasync/entity/point_type.go

@@ -0,0 +1,27 @@
+package entity
+
+import (
+	. "online_datasync/config"
+	. "online_datasync/db"
+)
+
+var (
+	Point_type *point_type
+)
+
+type point_type struct {
+}
+
+func (p *point_type) TableName() string {
+	return "point_type"
+}
+
+//
+func (p *point_type) SaveFields() []string {
+	return []string{"id", "code", "name"}
+}
+
+//
+func (p *point_type) Run(start_unix, end_unix int64, start_layout, end_layout string) {
+	TimeTask.Point_type_id = sync_add(Mysql_From_Jypoints, p.TableName(), p.SaveFields(), TimeTask.Point_type_id, nil)
+}

+ 1 - 23
online_datasync/entity/raw_product.go

@@ -1,11 +1,8 @@
 package entity
 
 import (
-	"log"
 	. "online_datasync/config"
 	. "online_datasync/db"
-
-	. "app.yhyue.com/moapp/jybase/common"
 )
 
 var (
@@ -26,24 +23,5 @@ func (r *raw_product) SaveFields() []string {
 
 //
 func (r *raw_product) Run(start_unix, end_unix int64, start_layout, end_layout string) {
-	log.Println("开始同步", r.TableName(), "表。。。")
-	index := 0
-	array := []interface{}{}
-	Mysql_From_Jyactivities.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
-		for _, v := range *l {
-			index++
-			TimeTask.Raw_product_id = Int64All(v["id"])
-			array = append(array, v["id"], v["productName"], v["productCode"], v["parentCode"], 1, 1)
-			if index%Config.InsertBathSize == 0 {
-				log.Println("同步", r.TableName(), "表", index)
-				Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
-				array = []interface{}{}
-			}
-		}
-	}, `select id,productName,productCode,parentCode from product where id>? order by id`, TimeTask.Raw_product_id)
-	if len(array) > 0 {
-		Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
-		array = []interface{}{}
-	}
-	log.Println("同步", r.TableName(), "表结束。。。", index)
+	TimeTask.Raw_product_id = sync_add(Mysql_From_Jyactivities, r.TableName(), r.SaveFields(), TimeTask.Raw_product_id, nil)
 }

+ 57 - 0
online_datasync/entity/user_prize.go

@@ -0,0 +1,57 @@
+package entity
+
+import (
+	"log"
+	. "online_datasync/config"
+	. "online_datasync/db"
+
+	. "app.yhyue.com/moapp/jybase/date"
+)
+
+var (
+	User_prize *user_prize
+)
+
+type user_prize struct {
+}
+
+func (u *user_prize) TableName() string {
+	return "user_prize"
+}
+
+//
+func (u *user_prize) SaveFields() []string {
+	return []string{"id", "userId", "prizeId", "validityDates", "beginDate", "endDate", "useDate", "prizeType", "appId", "lotteryId", "createTime", "name", "full", "reduce", "userName", "orderCode", "timestamp"}
+}
+
+//
+func (u *user_prize) Run(start_unix, end_unix int64, start_layout, end_layout string) {
+	TimeTask.User_prize_id = sync_add(Mysql_From_Jyactivities, u.TableName(), u.SaveFields(), TimeTask.User_prize_id, nil)
+	if start_unix > 0 {
+		u.update(start_layout, end_layout)
+	}
+}
+
+//
+func (u *user_prize) update(start_layout, end_layout string) {
+	log.Println("开始同步更新", u.TableName(), "表 。。。")
+	index := 0
+	array := [][]interface{}{}
+	fields := []string{"prizeType", "timestamp"}
+	Mysql_From_Jyactivities.SelectByBath(Config.SelectBathSize, func(l *[]map[string]interface{}) {
+		for _, v := range *l {
+			index++
+			array = append(array, []interface{}{v["prizeType"], NowFormat(Date_Full_Layout)})
+			if index%Config.UpdateBathSize == 0 {
+				log.Println("同步更新", u.TableName(), "表", index)
+				Mysql_Main.UpdateBath(u.TableName(), fields, array)
+				array = [][]interface{}{}
+			}
+		}
+	}, `select prizeType from user_prize where useDate>=? and useDate<?`, start_layout, end_layout)
+	if len(array) > 0 {
+		Mysql_Main.UpdateBath(u.TableName(), fields, array)
+		array = [][]interface{}{}
+	}
+	log.Println("同步更新", u.TableName(), "表结束。。。", index)
+}

+ 21 - 12
online_datasync/main.go

@@ -14,26 +14,35 @@ import (
 )
 
 var (
-	wait = &sync.WaitGroup{}
-	pool = make(chan bool, Config.SyncPool)
+	wait   = &sync.WaitGroup{}
+	pool   = make(chan bool, Config.SyncPool)
+	allMap = map[string]Entity{
+		"raw_user":              Raw_user,
+		"raw_product":           Raw_product,
+		"action_order":          Action_order,
+		"action_order_spec":     Action_order,
+		"action_product_record": Action_product_record,
+		"user_prize":            User_prize,
+		"integral_flow":         Integral_flow,
+		"point_type":            Point_type,
+	}
 )
 
 func main() {
 	m := flag.Int("m", 0, "模式 0:定时任务 1:非定时任务")
 	t := flag.String("t", "", "表名")
 	flag.Parse()
-	all := []Entity{Raw_user, Raw_product, Action_order, Action_product_record}
+	all := []Entity{}
+	for _, v := range allMap {
+		all = append(all, v)
+	}
 	if *m == 1 {
-		if *t == "raw_user" {
-			run(Raw_user)
-		} else if *t == "raw_product" {
-			run(Raw_product)
-		} else if *t == "action_order" || *t == "action_order_spec" {
-			run(Action_order)
-		} else if *t == "action_product_record" {
-			run(Action_order, Action_product_record)
-		} else if *t == "" {
+		if *t == "" {
 			run(all...)
+		} else if allMap[*t] != nil {
+			run(allMap[*t])
+		} else {
+			log.Fatalln(*t, "表的同步数据任务不存在")
 		}
 	} else {
 		SimpleCrontab(false, Config.RunTime, func() {

+ 1 - 1
online_datasync/timetask.json

@@ -1 +1 @@
-{"datetime":"2021-04-26 10:48:14","user_mgo_mysql_id":"60852b753d1ddb654ce27b0f","raw_product_id":15,"dataexport_order_id":33015}
+{"datetime":"2021-05-06 11:33:13","user_mgo_mysql_id":"608a16e0cea9cb967ab1551c","raw_product_id":15,"dataexport_order_id":33015,"point_type_id":12,"integral_flow_id":2601,"user_prize_id":699}

+ 3 - 0
online_syncto_offline/config/config.go

@@ -22,6 +22,9 @@ type timeTask struct {
 	Raw_page_id              int64  `json:"raw_page_id"`
 	Raw_product_id           int64  `json:"raw_product_id"`
 	Raw_user_id              int64  `json:"raw_user_id"`
+	Point_type_id            int64  `json:"point_type_id"`
+	Integral_flow_id         int64  `json:"integral_flow_id"`
+	User_prize_id            int64  `json:"user_prize_id"`
 }
 
 var (

+ 1 - 1
online_syncto_offline/entity/entity.go

@@ -42,7 +42,7 @@ func sync_add(tableName string, saveField []string, id int64, f func(fv map[stri
 		Mysql_To.InsertIgnoreBatch(tableName, saveField, array)
 		array = []interface{}{}
 	}
-	log.Println("同步", tableName, "表结束。。。")
+	log.Println("同步", tableName, "表结束。。。", index)
 	return
 }
 

+ 26 - 0
online_syncto_offline/entity/integral_flow.go

@@ -0,0 +1,26 @@
+package entity
+
+import (
+	. "online_syncto_offline/config"
+)
+
+var (
+	Integral_flow *integral_flow
+)
+
+type integral_flow struct {
+}
+
+func (i *integral_flow) TableName() string {
+	return "integral_flow"
+}
+
+//
+func (i *integral_flow) SaveFields() []string {
+	return []string{"id", "userId", "pointType", "SourceId", "sourceType", "point", "createTime", "endDate", "appId", "sort", "abstract", "operationType", "serialNumber"}
+}
+
+//
+func (i *integral_flow) Run(start_layout, end_layout string) {
+	TimeTask.Integral_flow_id = sync_add(i.TableName(), i.SaveFields(), TimeTask.Integral_flow_id, nil)
+}

+ 26 - 0
online_syncto_offline/entity/point_type.go

@@ -0,0 +1,26 @@
+package entity
+
+import (
+	. "online_syncto_offline/config"
+)
+
+var (
+	Point_type *point_type
+)
+
+type point_type struct {
+}
+
+func (p *point_type) TableName() string {
+	return "point_type"
+}
+
+//
+func (p *point_type) SaveFields() []string {
+	return []string{"id", "code", "name"}
+}
+
+//
+func (p *point_type) Run(start_layout, end_layout string) {
+	TimeTask.Point_type_id = sync_add(p.TableName(), p.SaveFields(), TimeTask.Point_type_id, nil)
+}

+ 29 - 0
online_syncto_offline/entity/user_prize.go

@@ -0,0 +1,29 @@
+package entity
+
+import (
+	. "online_syncto_offline/config"
+)
+
+var (
+	User_prize *user_prize
+)
+
+type user_prize struct {
+}
+
+func (u *user_prize) TableName() string {
+	return "user_prize"
+}
+
+//
+func (u *user_prize) SaveFields() []string {
+	return []string{"id", "userId", "prizeId", "validityDates", "beginDate", "endDate", "useDate", "prizeType", "appId", "lotteryId", "createTime", "name", "full", "reduce", "userName", "orderCode", "timestamp"}
+}
+
+//
+func (u *user_prize) Run(start_layout, end_layout string) {
+	if start_layout != "" {
+		sync_update(u.TableName(), start_layout, end_layout, u.SaveFields(), TimeTask.User_prize_id)
+	}
+	TimeTask.User_prize_id = sync_add(u.TableName(), u.SaveFields(), TimeTask.User_prize_id, nil)
+}

+ 27 - 26
online_syncto_offline/main.go

@@ -13,40 +13,41 @@ import (
 )
 
 var (
-	wait = &sync.WaitGroup{}
-	pool = make(chan bool, Config.SyncPool)
+	wait   = &sync.WaitGroup{}
+	pool   = make(chan bool, Config.SyncPool)
+	allMap = map[string]Entity{
+		"raw_user":              Raw_user,
+		"raw_product":           Raw_product,
+		"raw_page":              Raw_page,
+		"raw_element":           Raw_element,
+		"raw_event":             Raw_event,
+		"action_login":          Action_login,
+		"action_page":           Action_page,
+		"action_event":          Action_event,
+		"action_order":          Action_order,
+		"action_order_spec":     Action_order_spec,
+		"action_product_record": Action_product_record,
+		"user_prize":            User_prize,
+		"integral_flow":         Integral_flow,
+		"point_type":            Point_type,
+	}
 )
 
 func main() {
 	m := flag.Int("m", 0, "模式 0:定时任务 1:非定时任务")
 	t := flag.String("t", "", "表名")
 	flag.Parse()
-	all := []Entity{Raw_user, Raw_product, Raw_page, Raw_element, Raw_event, Action_login, Action_page, Action_event, Action_order, Action_order_spec, Action_product_record}
+	all := []Entity{}
+	for _, v := range allMap {
+		all = append(all, v)
+	}
 	if *m == 1 {
-		if *t == "raw_user" {
-			run(Raw_user)
-		} else if *t == "raw_product" {
-			run(Raw_product)
-		} else if *t == "raw_page" {
-			run(Raw_page)
-		} else if *t == "raw_element" {
-			run(Raw_element)
-		} else if *t == "raw_event" {
-			run(Raw_event)
-		} else if *t == "action_login" {
-			run(Action_login)
-		} else if *t == "action_page" {
-			run(Action_page)
-		} else if *t == "action_event" {
-			run(Action_event)
-		} else if *t == "action_order" {
-			run(Action_order)
-		} else if *t == "action_order_spec" {
-			run(Action_order_spec)
-		} else if *t == "action_product_record" {
-			run(Action_product_record)
-		} else if *t == "" {
+		if *t == "" {
 			run(all...)
+		} else if allMap[*t] != nil {
+			run(allMap[*t])
+		} else {
+			log.Fatalln(*t, "表的同步数据任务不存在")
 		}
 	} else {
 		SimpleCrontab(false, Config.RunTime, func() {

+ 1 - 1
online_syncto_offline/timetask.json

@@ -1 +1 @@
-{"datetime":"2021-04-26 10:48:20","action_event_id":213662781,"action_login_id":75144,"action_order_id":517131,"action_order_spec_id":157183,"action_page_id":213662872,"action_product_record_id":157183,"raw_element_id":16,"raw_event_id":5,"raw_page_id":327,"raw_product_id":0,"raw_user_id":1933484}
+{"datetime":"2021-05-06 11:55:12","action_event_id":213662781,"action_login_id":75312,"action_order_id":517131,"action_order_spec_id":157183,"action_page_id":213662872,"action_product_record_id":157183,"raw_element_id":1851,"raw_event_id":5,"raw_page_id":327,"raw_product_id":0,"raw_user_id":1933484,"point_type_id":12,"integral_flow_id":2601,"user_prize_id":699}