|
@@ -0,0 +1,192 @@
|
|
|
+package entity
|
|
|
+
|
|
|
+import (
|
|
|
+ "log"
|
|
|
+ . "online_datasync/config"
|
|
|
+ . "online_datasync/db"
|
|
|
+ "strings"
|
|
|
+ "sync"
|
|
|
+
|
|
|
+ . "app.yhyue.com/moapp/jybase/common"
|
|
|
+ . "app.yhyue.com/moapp/jybase/date"
|
|
|
+ . "app.yhyue.com/moapp/jybase/mongodb"
|
|
|
+)
|
|
|
+
|
|
|
+var (
|
|
|
+ Bxt_raw_user *bxt_raw_user
|
|
|
+)
|
|
|
+
|
|
|
+type bxt_raw_user struct {
|
|
|
+ User_id string //userid
|
|
|
+ Reg_time string //注册日期
|
|
|
+ Reg_type string //注册方式[手机、微信]
|
|
|
+ Province string //省份
|
|
|
+ City string //城市
|
|
|
+ Device string //常用设备
|
|
|
+ Company string //公司名称
|
|
|
+ Job string //职务
|
|
|
+ Source_module string //来源模块
|
|
|
+ Source_channel string //来源渠道
|
|
|
+ Follow_status int //关注状态 1未取关 2取关
|
|
|
+ Phone string //手机号
|
|
|
+ Open_id string //微信id
|
|
|
+ Channel_id string //例如:如果是推荐人记推荐用户id
|
|
|
+ Email string //
|
|
|
+ Name string //姓名
|
|
|
+ Timestamp string //更新时间
|
|
|
+}
|
|
|
+
|
|
|
+func (b *bxt_raw_user) TableName() string {
|
|
|
+ return "raw_user"
|
|
|
+}
|
|
|
+
|
|
|
+//
|
|
|
+func (b *bxt_raw_user) SaveFields() []string {
|
|
|
+ return []string{"user_id", "reg_time", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "channel_id", "name", "email", "timestamp"}
|
|
|
+}
|
|
|
+
|
|
|
+//
|
|
|
+func (b *bxt_raw_user) selectField() map[string]interface{} {
|
|
|
+ return map[string]interface{}{
|
|
|
+ "_id": 1,
|
|
|
+ "s_phone": 1,
|
|
|
+ "l_registedate": 1,
|
|
|
+ "s_company": 1,
|
|
|
+ "o_member_jy.s_email": 1,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//
|
|
|
+func (b *bxt_raw_user) Run(start_unix, end_unix int64, start_layout, end_layout string) {
|
|
|
+ log.Println("开始同步标讯通", b.TableName(), "表。。。", start_unix, end_unix)
|
|
|
+ b.add()
|
|
|
+ if start_unix > 0 {
|
|
|
+ b.update(start_unix, end_unix)
|
|
|
+ }
|
|
|
+ log.Println("同步讯通", b.TableName(), "表结束。。。")
|
|
|
+}
|
|
|
+
|
|
|
+//新增
|
|
|
+func (b *bxt_raw_user) add() (id string) {
|
|
|
+ defer Catch()
|
|
|
+ index := 0
|
|
|
+ array := []interface{}{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ pool := make(chan bool, Config.SelectMgoUserPool)
|
|
|
+ wait := &sync.WaitGroup{}
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "i_appid": 2,
|
|
|
+ }
|
|
|
+ if TimeTask.Bxt_user_mgo_mysql_id != "" {
|
|
|
+ q["_id"] = map[string]interface{}{
|
|
|
+ "$gt": StringTOBsonId(TimeTask.Bxt_user_mgo_mysql_id),
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.Println("开始同步新增", b.TableName(), "表。。。", q)
|
|
|
+ it := sess.DB("bxt").C("user").Find(q).Sort("_id").Select(b.selectField()).Iter()
|
|
|
+ for temp := make(map[string]interface{}); it.Next(&temp); {
|
|
|
+ TimeTask.Bxt_user_mgo_mysql_id = BsonIdToSId(temp["_id"])
|
|
|
+ pool <- true
|
|
|
+ wait.Add(1)
|
|
|
+ go func(m map[string]interface{}) {
|
|
|
+ defer Catch()
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wait.Done()
|
|
|
+ }()
|
|
|
+ ru := new_bxt_raw_user(m, true)
|
|
|
+ lock.Lock()
|
|
|
+ defer lock.Unlock()
|
|
|
+ index++
|
|
|
+ array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Name, ru.Email, ru.Timestamp)
|
|
|
+ if index%Config.InsertBathSize == 0 {
|
|
|
+ log.Println("同步新增", b.TableName(), "表", index)
|
|
|
+ Mysql_Main_Bxt.InsertIgnoreBatch(b.TableName(), b.SaveFields(), array)
|
|
|
+ array = []interface{}{}
|
|
|
+ }
|
|
|
+ }(temp)
|
|
|
+ temp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ wait.Wait()
|
|
|
+ if len(array) > 0 {
|
|
|
+ Mysql_Main_Bxt.InsertIgnoreBatch(b.TableName(), b.SaveFields(), array)
|
|
|
+ array = []interface{}{}
|
|
|
+ }
|
|
|
+ log.Println("同步新增", b.TableName(), "表结束。。。", index)
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
+//更新
|
|
|
+func (b *bxt_raw_user) update(start_unix, end_unix int64) {
|
|
|
+ log.Println("开始同步user表,Mgo to Mysql 更新。。。")
|
|
|
+ defer Catch()
|
|
|
+ index := 0
|
|
|
+ fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "name", "email", "timestamp"}
|
|
|
+ array := [][]interface{}{}
|
|
|
+ lock := &sync.Mutex{}
|
|
|
+ pool := make(chan bool, Config.SelectMgoUserPool)
|
|
|
+ wait := &sync.WaitGroup{}
|
|
|
+ sess := Mgo.GetMgoConn()
|
|
|
+ defer Mgo.DestoryMongoConn(sess)
|
|
|
+ q := map[string]interface{}{
|
|
|
+ "i_appid": 2,
|
|
|
+ "auto_updatetime": map[string]interface{}{
|
|
|
+ "$gte": start_unix,
|
|
|
+ "$lt": end_unix,
|
|
|
+ },
|
|
|
+ }
|
|
|
+ log.Println("同步更新", b.TableName(), "表。。。", q)
|
|
|
+ it := sess.DB("bxt").C("user").Find(q).Sort("_id").Select(b.selectField()).Iter()
|
|
|
+ for temp := make(map[string]interface{}); it.Next(&temp); {
|
|
|
+ pool <- true
|
|
|
+ wait.Add(1)
|
|
|
+ go func(m map[string]interface{}) {
|
|
|
+ defer Catch()
|
|
|
+ defer func() {
|
|
|
+ <-pool
|
|
|
+ wait.Done()
|
|
|
+ }()
|
|
|
+ ru := new_bxt_raw_user(m, false)
|
|
|
+ lock.Lock()
|
|
|
+ defer lock.Unlock()
|
|
|
+ index++
|
|
|
+ array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Name, ru.Email, ru.Timestamp})
|
|
|
+ if index%Config.UpdateBathSize == 0 {
|
|
|
+ log.Println("同步更新", b.TableName(), "表", index)
|
|
|
+ Mysql_Main_Bxt.UpdateBath(b.TableName(), fields, array)
|
|
|
+ array = [][]interface{}{}
|
|
|
+ }
|
|
|
+ }(temp)
|
|
|
+ temp = make(map[string]interface{})
|
|
|
+ }
|
|
|
+ wait.Wait()
|
|
|
+ if len(array) > 0 {
|
|
|
+ Mysql_Main_Bxt.UpdateBath(b.TableName(), fields, array)
|
|
|
+ array = [][]interface{}{}
|
|
|
+ }
|
|
|
+ log.Println("同步更新", b.TableName(), "表结束。。。", index)
|
|
|
+}
|
|
|
+
|
|
|
+//
|
|
|
+func new_bxt_raw_user(m map[string]interface{}, flag bool) *bxt_raw_user {
|
|
|
+ registedate := Int64All(m["l_registedate"])
|
|
|
+ //邮箱
|
|
|
+ email := strings.TrimSpace(ObjToString(m["s_myemail"]))
|
|
|
+ if email == "" {
|
|
|
+ o_member_jy, _ := m["o_member_jy"].(map[string]interface{})
|
|
|
+ email = strings.TrimSpace(ObjToString(o_member_jy["s_email"]))
|
|
|
+ }
|
|
|
+ return &bxt_raw_user{
|
|
|
+ User_id: BsonIdToSId(m["_id"]),
|
|
|
+ Reg_time: FormatDateByInt64(®istedate, Date_Full_Layout),
|
|
|
+ Reg_type: "手机",
|
|
|
+ Company: strings.TrimSpace(ObjToString(m["s_company"])),
|
|
|
+ Source_module: ObjToString(m["s_module"]),
|
|
|
+ Phone: strings.TrimSpace(ObjToString(m["s_phone"])),
|
|
|
+ Email: email,
|
|
|
+ Timestamp: NowFormat(Date_Full_Layout),
|
|
|
+ Name: strings.TrimSpace(ObjToString(m["s_name"])),
|
|
|
+ }
|
|
|
+}
|