wangchuanjin 4 жил өмнө
parent
commit
0664e58e8f

+ 2 - 1
online_datasync/config.json

@@ -3,5 +3,6 @@
 	"updateBathSize": 100,
 	"insertBathSize": 200,
 	"selectBathSize": 200,
-	"syncPool":5
+	"syncPool":5,
+	"selectMgoUserPool":30
 }

+ 6 - 5
online_datasync/config/config.go

@@ -3,11 +3,12 @@ package config
 import . "app.yhyue.com/moapp/jybase/common"
 
 type config struct {
-	RunTime        string
-	UpdateBathSize int
-	InsertBathSize int
-	SelectBathSize int
-	SyncPool       int
+	RunTime           string
+	UpdateBathSize    int
+	InsertBathSize    int
+	SelectBathSize    int
+	SyncPool          int
+	SelectMgoUserPool int
 }
 type timeTask struct {
 	Datetime            string `json:"datetime"`

+ 1 - 1
online_datasync/db.json

@@ -2,7 +2,7 @@
 	"mongodb": {
 		"main": {
 			"address": "192.168.3.128:27080",
-	 		"size": 5,
+	 		"size": 35,
 	 		"dbName": "qfw",
 			"replSet": ""
 		}

+ 58 - 29
online_datasync/entity/raw_user.go

@@ -6,6 +6,7 @@ import (
 	. "online_datasync/db"
 	"online_datasync/phonedata"
 	"strings"
+	"sync"
 
 	. "app.yhyue.com/moapp/jybase/common"
 	. "app.yhyue.com/moapp/jybase/date"
@@ -82,6 +83,11 @@ func (r *raw_user) Run(start_unix, end_unix int64, start_layout, end_layout stri
 //新增
 func (r *raw_user) add() (id string) {
 	defer Catch()
+	index := 0
+	array := []interface{}{}
+	lock := &sync.Mutex{}
+	pool := make(chan bool, Config.SelectMgoUserPool)
+	wait := &sync.WaitGroup{}
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
 	q := map[string]interface{}{
@@ -94,20 +100,30 @@ func (r *raw_user) add() (id string) {
 	}
 	log.Println("开始同步新增", r.TableName(), "表。。。", q)
 	it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter()
-	index := 0
-	array := []interface{}{}
-	for m := make(map[string]interface{}); it.Next(&m); {
-		index++
-		ru := new_raw_user(m, true)
-		TimeTask.User_mgo_mysql_id = ru.User_id
-		array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Email, ru.Timestamp)
-		if index%Config.InsertBathSize == 0 {
-			log.Println("同步新增", r.TableName(), "表", index)
-			Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
-			array = []interface{}{}
-		}
-		m = make(map[string]interface{})
+	for temp := make(map[string]interface{}); it.Next(&temp); {
+		TimeTask.User_mgo_mysql_id = BsonIdToSId(temp["_id"])
+		pool <- true
+		wait.Add(1)
+		go func(m map[string]interface{}) {
+			defer Catch()
+			defer func() {
+				<-pool
+				wait.Done()
+			}()
+			ru := new_raw_user(m, true)
+			lock.Lock()
+			defer lock.Unlock()
+			index++
+			array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Email, ru.Timestamp)
+			if index%Config.InsertBathSize == 0 {
+				log.Println("同步新增", r.TableName(), "表", index)
+				Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
+				array = []interface{}{}
+			}
+		}(temp)
+		temp = make(map[string]interface{})
 	}
+	wait.Wait()
 	if len(array) > 0 {
 		Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
 		array = []interface{}{}
@@ -120,33 +136,46 @@ func (r *raw_user) add() (id string) {
 func (r *raw_user) update(start_unix, end_unix int64) {
 	log.Println("开始同步user表,Mgo to Mysql 更新。。。")
 	defer Catch()
+	index := 0
+	fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "email", "timestamp"}
 	array := [][]interface{}{}
+	lock := &sync.Mutex{}
+	pool := make(chan bool, Config.SelectMgoUserPool)
+	wait := &sync.WaitGroup{}
 	sess := Mgo.GetMgoConn()
 	defer Mgo.DestoryMongoConn(sess)
 	q := map[string]interface{}{
 		"i_appid": 2,
-	}
-	if TimeTask.User_mgo_mysql_id != "" {
-		q["auto_updatetime"] = map[string]interface{}{
+		"auto_updatetime": map[string]interface{}{
 			"$gte": start_unix,
 			"$lt":  end_unix,
-		}
+		},
 	}
 	log.Println("同步更新", r.TableName(), "表。。。", q)
 	it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter()
-	index := 0
-	fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "email", "timestamp"}
-	for m := make(map[string]interface{}); it.Next(&m); {
-		index++
-		ru := new_raw_user(m, false)
-		array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Email, ru.Timestamp})
-		if index%Config.UpdateBathSize == 0 {
-			log.Println("同步更新", r.TableName(), "表", index)
-			Mysql_Main.UpdateBath(r.TableName(), fields, array)
-			array = [][]interface{}{}
-		}
-		m = make(map[string]interface{})
+	for temp := make(map[string]interface{}); it.Next(&temp); {
+		pool <- true
+		wait.Add(1)
+		go func(m map[string]interface{}) {
+			defer Catch()
+			defer func() {
+				<-pool
+				wait.Done()
+			}()
+			ru := new_raw_user(m, false)
+			lock.Lock()
+			defer lock.Unlock()
+			index++
+			array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Email, ru.Timestamp})
+			if index%Config.UpdateBathSize == 0 {
+				log.Println("同步更新", r.TableName(), "表", index)
+				Mysql_Main.UpdateBath(r.TableName(), fields, array)
+				array = [][]interface{}{}
+			}
+		}(temp)
+		temp = make(map[string]interface{})
 	}
+	wait.Wait()
 	if len(array) > 0 {
 		Mysql_Main.UpdateBath(r.TableName(), fields, array)
 		array = [][]interface{}{}