123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192 |
- package entity
- import (
- "log"
- . "online_datasync/config"
- . "online_datasync/db"
- "strings"
- "sync"
- . "app.yhyue.com/moapp/jybase/common"
- . "app.yhyue.com/moapp/jybase/date"
- . "app.yhyue.com/moapp/jybase/mongodb"
- )
- var (
- Bxt_raw_user *bxt_raw_user
- )
- type bxt_raw_user struct {
- User_id string //userid
- Reg_time string //注册日期
- Reg_type string //注册方式[手机、微信]
- Province string //省份
- City string //城市
- Device string //常用设备
- Company string //公司名称
- Job string //职务
- Source_module string //来源模块
- Source_channel string //来源渠道
- Follow_status int //关注状态 1未取关 2取关
- Phone string //手机号
- Open_id string //微信id
- Channel_id string //例如:如果是推荐人记推荐用户id
- Email string //
- Name string //姓名
- Timestamp string //更新时间
- }
- func (b *bxt_raw_user) TableName() string {
- return "raw_user"
- }
- //
- func (b *bxt_raw_user) SaveFields() []string {
- return []string{"user_id", "reg_time", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "channel_id", "name", "email", "timestamp"}
- }
- //
- func (b *bxt_raw_user) selectField() map[string]interface{} {
- return map[string]interface{}{
- "_id": 1,
- "s_phone": 1,
- "l_registedate": 1,
- "s_company": 1,
- "o_member_jy.s_email": 1,
- }
- }
- //
- func (b *bxt_raw_user) Run(start_unix, end_unix int64, start_layout, end_layout string) {
- log.Println("开始同步标讯通", b.TableName(), "表。。。", start_unix, end_unix)
- b.add()
- if start_unix > 0 {
- b.update(start_unix, end_unix)
- }
- log.Println("同步讯通", b.TableName(), "表结束。。。")
- }
- //新增
- func (b *bxt_raw_user) add() (id string) {
- defer Catch()
- index := 0
- array := []interface{}{}
- lock := &sync.Mutex{}
- pool := make(chan bool, Config.SelectMgoUserPool)
- wait := &sync.WaitGroup{}
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "i_appid": 2,
- }
- if TimeTask.Bxt_user_mgo_mysql_id != "" {
- q["_id"] = map[string]interface{}{
- "$gt": StringTOBsonId(TimeTask.Bxt_user_mgo_mysql_id),
- }
- }
- log.Println("开始同步新增", b.TableName(), "表。。。", q)
- it := sess.DB("bxt").C("user").Find(q).Sort("_id").Select(b.selectField()).Iter()
- for temp := make(map[string]interface{}); it.Next(&temp); {
- TimeTask.Bxt_user_mgo_mysql_id = BsonIdToSId(temp["_id"])
- pool <- true
- wait.Add(1)
- go func(m map[string]interface{}) {
- defer Catch()
- defer func() {
- <-pool
- wait.Done()
- }()
- ru := new_bxt_raw_user(m, true)
- lock.Lock()
- defer lock.Unlock()
- index++
- array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Name, ru.Email, ru.Timestamp)
- if index%Config.InsertBathSize == 0 {
- log.Println("同步新增", b.TableName(), "表", index)
- Mysql_Main_Bxt.InsertIgnoreBatch(b.TableName(), b.SaveFields(), array)
- array = []interface{}{}
- }
- }(temp)
- temp = make(map[string]interface{})
- }
- wait.Wait()
- if len(array) > 0 {
- Mysql_Main_Bxt.InsertIgnoreBatch(b.TableName(), b.SaveFields(), array)
- array = []interface{}{}
- }
- log.Println("同步新增", b.TableName(), "表结束。。。", index)
- return
- }
- //更新
- func (b *bxt_raw_user) update(start_unix, end_unix int64) {
- log.Println("开始同步user表,Mgo to Mysql 更新。。。")
- defer Catch()
- index := 0
- fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "name", "email", "timestamp"}
- array := [][]interface{}{}
- lock := &sync.Mutex{}
- pool := make(chan bool, Config.SelectMgoUserPool)
- wait := &sync.WaitGroup{}
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "i_appid": 2,
- "auto_updatetime": map[string]interface{}{
- "$gte": start_unix,
- "$lt": end_unix,
- },
- }
- log.Println("同步更新", b.TableName(), "表。。。", q)
- it := sess.DB("bxt").C("user").Find(q).Sort("_id").Select(b.selectField()).Iter()
- for temp := make(map[string]interface{}); it.Next(&temp); {
- pool <- true
- wait.Add(1)
- go func(m map[string]interface{}) {
- defer Catch()
- defer func() {
- <-pool
- wait.Done()
- }()
- ru := new_bxt_raw_user(m, false)
- lock.Lock()
- defer lock.Unlock()
- index++
- array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Name, ru.Email, ru.Timestamp})
- if index%Config.UpdateBathSize == 0 {
- log.Println("同步更新", b.TableName(), "表", index)
- Mysql_Main_Bxt.UpdateBath(b.TableName(), fields, array)
- array = [][]interface{}{}
- }
- }(temp)
- temp = make(map[string]interface{})
- }
- wait.Wait()
- if len(array) > 0 {
- Mysql_Main_Bxt.UpdateBath(b.TableName(), fields, array)
- array = [][]interface{}{}
- }
- log.Println("同步更新", b.TableName(), "表结束。。。", index)
- }
- //
- func new_bxt_raw_user(m map[string]interface{}, flag bool) *bxt_raw_user {
- registedate := Int64All(m["l_registedate"])
- //邮箱
- email := strings.TrimSpace(ObjToString(m["s_myemail"]))
- if email == "" {
- o_member_jy, _ := m["o_member_jy"].(map[string]interface{})
- email = strings.TrimSpace(ObjToString(o_member_jy["s_email"]))
- }
- return &bxt_raw_user{
- User_id: BsonIdToSId(m["_id"]),
- Reg_time: FormatDateByInt64(®istedate, Date_Full_Layout),
- Reg_type: "手机",
- Company: strings.TrimSpace(ObjToString(m["s_company"])),
- Source_module: ObjToString(m["s_module"]),
- Phone: strings.TrimSpace(ObjToString(m["s_phone"])),
- Email: email,
- Timestamp: NowFormat(Date_Full_Layout),
- Name: strings.TrimSpace(ObjToString(m["s_name"])),
- }
- }
|