package entity import ( "log" . "online_datasync/config" . "online_datasync/db" "strings" "sync" . "app.yhyue.com/moapp/jybase/common" . "app.yhyue.com/moapp/jybase/date" . "app.yhyue.com/moapp/jybase/mongodb" ) var ( Bxt_raw_user *bxt_raw_user ) type bxt_raw_user struct { User_id string //userid Reg_time string //注册日期 Reg_type string //注册方式[手机、微信] Province string //省份 City string //城市 Device string //常用设备 Company string //公司名称 Job string //职务 Source_module string //来源模块 Source_channel string //来源渠道 Follow_status int //关注状态 1未取关 2取关 Phone string //手机号 Open_id string //微信id Channel_id string //例如:如果是推荐人记推荐用户id Email string // Name string //姓名 Timestamp string //更新时间 } func (b *bxt_raw_user) TableName() string { return "raw_user" } // func (b *bxt_raw_user) SaveFields() []string { return []string{"user_id", "reg_time", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "channel_id", "name", "email", "timestamp"} } // func (b *bxt_raw_user) selectField() map[string]interface{} { return map[string]interface{}{ "_id": 1, "s_phone": 1, "l_registedate": 1, "s_company": 1, "o_member_jy.s_email": 1, } } // func (b *bxt_raw_user) Run(start_unix, end_unix int64, start_layout, end_layout string) { log.Println("开始同步标讯通", b.TableName(), "表。。。", start_unix, end_unix) b.add() if start_unix > 0 { b.update(start_unix, end_unix) } log.Println("同步讯通", b.TableName(), "表结束。。。") } //新增 func (b *bxt_raw_user) add() (id string) { defer Catch() index := 0 array := []interface{}{} lock := &sync.Mutex{} pool := make(chan bool, Config.SelectMgoUserPool) wait := &sync.WaitGroup{} sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "i_appid": 2, } if TimeTask.Bxt_user_mgo_mysql_id != "" { q["_id"] = map[string]interface{}{ "$gt": StringTOBsonId(TimeTask.Bxt_user_mgo_mysql_id), } } log.Println("开始同步新增", b.TableName(), "表。。。", q) it := sess.DB("bxt").C("user").Find(q).Sort("_id").Select(b.selectField()).Iter() for temp := make(map[string]interface{}); it.Next(&temp); { TimeTask.Bxt_user_mgo_mysql_id = BsonIdToSId(temp["_id"]) pool <- true wait.Add(1) go func(m map[string]interface{}) { defer Catch() defer func() { <-pool wait.Done() }() ru := new_bxt_raw_user(m, true) lock.Lock() defer lock.Unlock() index++ array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Name, ru.Email, ru.Timestamp) if index%Config.InsertBathSize == 0 { log.Println("同步新增", b.TableName(), "表", index) Mysql_Main_Bxt.InsertIgnoreBatch(b.TableName(), b.SaveFields(), array) array = []interface{}{} } }(temp) temp = make(map[string]interface{}) } wait.Wait() if len(array) > 0 { Mysql_Main_Bxt.InsertIgnoreBatch(b.TableName(), b.SaveFields(), array) array = []interface{}{} } log.Println("同步新增", b.TableName(), "表结束。。。", index) return } //更新 func (b *bxt_raw_user) update(start_unix, end_unix int64) { log.Println("开始同步user表,Mgo to Mysql 更新。。。") defer Catch() index := 0 fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "name", "email", "timestamp"} array := [][]interface{}{} lock := &sync.Mutex{} pool := make(chan bool, Config.SelectMgoUserPool) wait := &sync.WaitGroup{} sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "i_appid": 2, "auto_updatetime": map[string]interface{}{ "$gte": start_unix, "$lt": end_unix, }, } log.Println("同步更新", b.TableName(), "表。。。", q) it := sess.DB("bxt").C("user").Find(q).Sort("_id").Select(b.selectField()).Iter() for temp := make(map[string]interface{}); it.Next(&temp); { pool <- true wait.Add(1) go func(m map[string]interface{}) { defer Catch() defer func() { <-pool wait.Done() }() ru := new_bxt_raw_user(m, false) lock.Lock() defer lock.Unlock() index++ array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Name, ru.Email, ru.Timestamp}) if index%Config.UpdateBathSize == 0 { log.Println("同步更新", b.TableName(), "表", index) Mysql_Main_Bxt.UpdateBath(b.TableName(), fields, array) array = [][]interface{}{} } }(temp) temp = make(map[string]interface{}) } wait.Wait() if len(array) > 0 { Mysql_Main_Bxt.UpdateBath(b.TableName(), fields, array) array = [][]interface{}{} } log.Println("同步更新", b.TableName(), "表结束。。。", index) } // func new_bxt_raw_user(m map[string]interface{}, flag bool) *bxt_raw_user { registedate := Int64All(m["l_registedate"]) //邮箱 email := strings.TrimSpace(ObjToString(m["s_myemail"])) if email == "" { o_member_jy, _ := m["o_member_jy"].(map[string]interface{}) email = strings.TrimSpace(ObjToString(o_member_jy["s_email"])) } return &bxt_raw_user{ User_id: BsonIdToSId(m["_id"]), Reg_time: FormatDateByInt64(®istedate, Date_Full_Layout), Reg_type: "手机", Company: strings.TrimSpace(ObjToString(m["s_company"])), Source_module: ObjToString(m["s_module"]), Phone: strings.TrimSpace(ObjToString(m["s_phone"])), Email: email, Timestamp: NowFormat(Date_Full_Layout), Name: strings.TrimSpace(ObjToString(m["s_name"])), } }