package entity import ( "log" . "online_datasync/config" . "online_datasync/db" "online_datasync/phonedata" "strings" "sync" . "app.yhyue.com/moapp/jybase/common" . "app.yhyue.com/moapp/jybase/date" . "app.yhyue.com/moapp/jybase/mongodb" ) var ( Raw_user *raw_user ) type raw_user struct { User_id string //userid Reg_time string //注册日期 Reg_type string //注册方式[手机、微信] Province string //省份 City string //城市 Device string //常用设备 Company string //公司名称 Job string //职务 Source_module string //来源模块 Source_channel string //来源渠道 Follow_status int //关注状态 1未取关 2取关 Phone string //手机号 Open_id string //微信id Channel_id string //例如:如果是推荐人记推荐用户id Email string // Timestamp string //更新时间 } func (r *raw_user) TableName() string { return "raw_user" } // func (r *raw_user) SaveFields() []string { return []string{"user_id", "reg_time", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "channel_id", "email", "timestamp"} } // func (r *raw_user) selectField() map[string]interface{} { return map[string]interface{}{ "_id": 1, "s_phone": 1, "s_m_phone": 1, "s_m_openid": 1, "l_registedate": 1, "l_w_registedate": 1, "l_a_registedate": 1, "a_m_openid": 1, "s_province": 1, "s_city": 1, "s_company": 1, "s_appponetype": 1, "i_ispush": 1, "s_myemail": 1, "o_jy.s_email": 1, "o_vipjy.s_email": 1, "o_member_jy.s_email": 1, "s_rsource": 1, "s_module": 1, } } // func (r *raw_user) Run(start_unix, end_unix int64, start_layout, end_layout string) { log.Println("开始同步", r.TableName(), "表。。。", start_unix, end_unix) r.add() if start_unix > 0 { r.update(start_unix, end_unix) } log.Println("同步", r.TableName(), "表结束。。。") } //新增 func (r *raw_user) add() (id string) { defer Catch() index := 0 array := []interface{}{} lock := &sync.Mutex{} pool := make(chan bool, Config.SelectMgoUserPool) wait := &sync.WaitGroup{} sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "i_appid": 2, } if TimeTask.User_mgo_mysql_id != "" { q["_id"] = map[string]interface{}{ "$gt": StringTOBsonId(TimeTask.User_mgo_mysql_id), } } log.Println("开始同步新增", r.TableName(), "表。。。", q) it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter() for temp := make(map[string]interface{}); it.Next(&temp); { TimeTask.User_mgo_mysql_id = BsonIdToSId(temp["_id"]) pool <- true wait.Add(1) go func(m map[string]interface{}) { defer Catch() defer func() { <-pool wait.Done() }() ru := new_raw_user(m, true) lock.Lock() defer lock.Unlock() index++ array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Email, ru.Timestamp) if index%Config.InsertBathSize == 0 { log.Println("同步新增", r.TableName(), "表", index) Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array) array = []interface{}{} } }(temp) temp = make(map[string]interface{}) } wait.Wait() if len(array) > 0 { Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array) array = []interface{}{} } log.Println("同步新增", r.TableName(), "表结束。。。", index) return } //更新 func (r *raw_user) update(start_unix, end_unix int64) { log.Println("开始同步user表,Mgo to Mysql 更新。。。") defer Catch() index := 0 fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "email", "timestamp"} array := [][]interface{}{} lock := &sync.Mutex{} pool := make(chan bool, Config.SelectMgoUserPool) wait := &sync.WaitGroup{} sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "i_appid": 2, "auto_updatetime": map[string]interface{}{ "$gte": start_unix, "$lt": end_unix, }, } log.Println("同步更新", r.TableName(), "表。。。", q) it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter() for temp := make(map[string]interface{}); it.Next(&temp); { pool <- true wait.Add(1) go func(m map[string]interface{}) { defer Catch() defer func() { <-pool wait.Done() }() ru := new_raw_user(m, false) lock.Lock() defer lock.Unlock() index++ array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Email, ru.Timestamp}) if index%Config.UpdateBathSize == 0 { log.Println("同步更新", r.TableName(), "表", index) Mysql_Main.UpdateBath(r.TableName(), fields, array) array = [][]interface{}{} } }(temp) temp = make(map[string]interface{}) } wait.Wait() if len(array) > 0 { Mysql_Main.UpdateBath(r.TableName(), fields, array) array = [][]interface{}{} } log.Println("同步更新", r.TableName(), "表结束。。。", index) } // func new_raw_user(m map[string]interface{}, flag bool) *raw_user { _id := BsonIdToSId(m["_id"]) s_m_openid := ObjToString(m["s_m_openid"]) phone := ObjToString(m["s_phone"]) if phone == "" { phone = ObjToString(m["s_m_phone"]) } registedate := Int64All(m["l_registedate"]) if registedate == 0 { registedate = Int64All(m["l_w_registedate"]) } if registedate == 0 { registedate = Int64All(m["l_a_registedate"]) } reg_type := []string{} if s_m_openid != "" || ObjToString(m["a_m_openid"]) != "" { reg_type = append(reg_type, "微信") } if ObjToString(m["s_phone"]) != "" { reg_type = append(reg_type, "手机") } province := ObjToString(m["s_province"]) city := ObjToString(m["s_city"]) if phone != "" { phoneData, err := phonedata.Find(phone) if err == nil { province = phoneData.Province city = phoneData.City } } company := ObjToString(m["s_company"]) job := "" user_msg, user_msg_ok := Mgo.FindOneByField("user_msg", map[string]interface{}{ "s_userId": _id, }, map[string]interface{}{ "s_company": 1, "s_job": 1, }) if user_msg_ok && user_msg != nil { if company == "" { company = ObjToString((*user_msg)["s_company"]) } job = ObjToString((*user_msg)["s_job"]) } if company == "" { member, member_ok := Mgo.FindOneByField("member", map[string]interface{}{ "userid": _id, }, map[string]interface{}{ "entname": 1, }) if member_ok && member != nil { company = ObjToString((*member)["company"]) } } if company == "" { applysub_user, applysub_user_ok := Mgo.FindOneByField("applysub_user", map[string]interface{}{ "s_openid": s_m_openid, }, map[string]interface{}{ "s_company": 1, }) if applysub_user_ok && applysub_user != nil { company = ObjToString((*applysub_user)["s_company"]) } } follow_status := 1 if s_m_openid != "" && IntAllDef(m["i_ispush"], 1) == 0 { follow_status = 2 } channel_id := "" if flag { person_invitelink, person_invitelink_ok := Mgo.FindOneByField("person_invitelink", map[string]interface{}{ "s_target_openid": s_m_openid, "$and": []map[string]interface{}{ map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}}, map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}}, map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}}, }, }, map[string]interface{}{ "s_source_openid": 1, }) if person_invitelink_ok && person_invitelink != nil { channel_id = ObjToString((*person_invitelink)["s_source_openid"]) } if channel_id == "" { person_activelink, person_activelink_ok := Mgo.FindOneByField("person_activelink", map[string]interface{}{ "s_target_openid": s_m_openid, "$and": []map[string]interface{}{ map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}}, map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}}, map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}}, }, }, map[string]interface{}{ "s_source_openid": 1, }) if person_activelink_ok && person_activelink != nil { channel_id = ObjToString((*person_activelink)["s_source_openid"]) } } } email := ObjToString(m["s_myemail"]) if email == "" { o_jy, _ := m["o_jy"].(map[string]interface{}) email = ObjToString(o_jy["s_email"]) } if email == "" { o_vipjy, _ := m["o_vipjy"].(map[string]interface{}) email = ObjToString(o_vipjy["s_email"]) } if email == "" { o_member_jy, _ := m["o_member_jy"].(map[string]interface{}) email = ObjToString(o_member_jy["s_email"]) } return &raw_user{ User_id: _id, Open_id: s_m_openid, Reg_time: FormatDateByInt64(®istedate, Date_Full_Layout), Province: province, City: city, Reg_type: strings.Join(reg_type, "、"), Device: ObjToString(m["s_appponetype"]), Company: company, Job: job, Source_module: ObjToString(m["s_module"]), Source_channel: ObjToString(m["s_rsource"]), Phone: phone, Channel_id: channel_id, Email: email, Timestamp: NowFormat(Date_Full_Layout), Follow_status: follow_status, } }