package entity import ( "log" . "online_datasync/config" . "online_datasync/db" "online_datasync/phonedata" "strings" "sync" . "app.yhyue.com/moapp/jybase/common" . "app.yhyue.com/moapp/jybase/date" . "app.yhyue.com/moapp/jybase/mongodb" ) var ( Raw_user *raw_user ) type raw_user struct { User_id string //userid Reg_time string //注册日期 Reg_type string //注册方式[手机、微信] Province string //省份 City string //城市 Device string //常用设备 Company string //公司名称 Job string //职务 Source_module string //来源模块 Source_channel string //来源渠道 Follow_status int //关注状态 1未取关 2取关 Phone string //手机号 Open_id string //微信id Channel_id string //例如:如果是推荐人记推荐用户id Email string // Name string //姓名 Timestamp string //更新时间 } func (r *raw_user) TableName() string { return "raw_user" } // func (r *raw_user) SaveFields() []string { return []string{"user_id", "reg_time", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "channel_id", "name", "email", "timestamp"} } // func (r *raw_user) selectField() map[string]interface{} { return map[string]interface{}{ "_id": 1, "s_phone": 1, "s_m_phone": 1, "s_m_openid": 1, "l_registedate": 1, "l_w_registedate": 1, "l_a_registedate": 1, "a_m_openid": 1, "s_province": 1, "s_city": 1, "s_company": 1, "s_appponetype": 1, "i_ispush": 1, "s_myemail": 1, "o_jy.s_email": 1, "o_vipjy.s_email": 1, "o_member_jy.s_email": 1, "s_rsource": 1, "s_module": 1, "a_collect_phone": 1, } } // func (r *raw_user) Run(start_unix, end_unix int64, start_layout, end_layout string) { log.Println("开始同步", r.TableName(), "表。。。", start_unix, end_unix) r.add() if start_unix > 0 { r.update(start_unix, end_unix) } log.Println("同步", r.TableName(), "表结束。。。") } //新增 func (r *raw_user) add() (id string) { defer Catch() index := 0 array := []interface{}{} lock := &sync.Mutex{} pool := make(chan bool, Config.SelectMgoUserPool) wait := &sync.WaitGroup{} sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "i_appid": 2, } if TimeTask.User_mgo_mysql_id != "" { q["_id"] = map[string]interface{}{ "$gt": StringTOBsonId(TimeTask.User_mgo_mysql_id), } } log.Println("开始同步新增", r.TableName(), "表。。。", q) it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter() for temp := make(map[string]interface{}); it.Next(&temp); { TimeTask.User_mgo_mysql_id = BsonIdToSId(temp["_id"]) pool <- true wait.Add(1) go func(m map[string]interface{}) { defer Catch() defer func() { <-pool wait.Done() }() ru := new_raw_user(m, true) lock.Lock() defer lock.Unlock() index++ array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Name, ru.Email, ru.Timestamp) if index%Config.InsertBathSize == 0 { log.Println("同步新增", r.TableName(), "表", index) Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array) array = []interface{}{} } }(temp) temp = make(map[string]interface{}) } wait.Wait() if len(array) > 0 { Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array) array = []interface{}{} } log.Println("同步新增", r.TableName(), "表结束。。。", index) return } //更新 func (r *raw_user) update(start_unix, end_unix int64) { log.Println("开始同步user表,Mgo to Mysql 更新。。。") defer Catch() index := 0 fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "name", "email", "timestamp"} array := [][]interface{}{} lock := &sync.Mutex{} pool := make(chan bool, Config.SelectMgoUserPool) wait := &sync.WaitGroup{} sess := Mgo.GetMgoConn() defer Mgo.DestoryMongoConn(sess) q := map[string]interface{}{ "i_appid": 2, "auto_updatetime": map[string]interface{}{ "$gte": start_unix, "$lt": end_unix, }, } log.Println("同步更新", r.TableName(), "表。。。", q) it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter() for temp := make(map[string]interface{}); it.Next(&temp); { pool <- true wait.Add(1) go func(m map[string]interface{}) { defer Catch() defer func() { <-pool wait.Done() }() ru := new_raw_user(m, false) lock.Lock() defer lock.Unlock() index++ array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Name, ru.Email, ru.Timestamp}) if index%Config.UpdateBathSize == 0 { log.Println("同步更新", r.TableName(), "表", index) Mysql_Main.UpdateBath(r.TableName(), fields, array) array = [][]interface{}{} } }(temp) temp = make(map[string]interface{}) } wait.Wait() if len(array) > 0 { Mysql_Main.UpdateBath(r.TableName(), fields, array) array = [][]interface{}{} } log.Println("同步更新", r.TableName(), "表结束。。。", index) } // func new_raw_user(m map[string]interface{}, flag bool) *raw_user { _id := BsonIdToSId(m["_id"]) s_m_openid := ObjToString(m["s_m_openid"]) registedate := Int64All(m["l_registedate"]) if registedate == 0 { registedate = Int64All(m["l_w_registedate"]) } if registedate == 0 { registedate = Int64All(m["l_a_registedate"]) } reg_type := []string{} if s_m_openid != "" || ObjToString(m["a_m_openid"]) != "" { reg_type = append(reg_type, "微信") } if ObjToString(m["s_phone"]) != "" { reg_type = append(reg_type, "手机") } province := ObjToString(m["s_province"]) city := ObjToString(m["s_city"]) phone := strings.TrimSpace(ObjToString(m["s_phone"])) if phone == "" { phone = strings.TrimSpace(ObjToString(m["s_m_phone"])) } phone_map := map[string]bool{} phone_array := []string{} if phone != "" { phone_map[phone] = true phone_array = append(phone_array, phone) } //邮箱 email := strings.TrimSpace(ObjToString(m["s_myemail"])) if email == "" { o_jy, _ := m["o_jy"].(map[string]interface{}) email = strings.TrimSpace(ObjToString(o_jy["s_email"])) } if email == "" { o_vipjy, _ := m["o_vipjy"].(map[string]interface{}) email = strings.TrimSpace(ObjToString(o_vipjy["s_email"])) } if email == "" { o_member_jy, _ := m["o_member_jy"].(map[string]interface{}) email = strings.TrimSpace(ObjToString(o_member_jy["s_email"])) } name := "" company := "" //手机号 公司名称 邮箱 姓名 从订单、发票中取 orders := Mysql_From_Jianyu.SelectBySql(`SELECT a.user_phone,b.apply_phone,b.apply_company,b.apply_realyname,b.email,c.phone,c.mail,c.company_name from dataexport_order a LEFT JOIN apply_invoice b on (a.user_id=? and a.id=b.order_id) LEFT JOIN invoice c on (a.user_id=? and a.order_code=c.order_code) where a.user_id=? and (a.user_phone is not null or b.apply_company is not null or c.company_name is not null) order by a.id desc limit 100`, _id, _id, _id) if orders != nil { for _, v := range *orders { if order_phone := strings.TrimSpace(ObjToString(v["user_phone"])); order_phone != "" && !phone_map[order_phone] { phone_map[order_phone] = true phone_array = append(phone_array, order_phone) } if apply_invoice_phone := strings.TrimSpace(ObjToString(v["apply_phone"])); apply_invoice_phone != "" && !phone_map[apply_invoice_phone] { phone_map[apply_invoice_phone] = true phone_array = append(phone_array, apply_invoice_phone) } if invoice_phone := strings.TrimSpace(ObjToString(v["phone"])); invoice_phone != "" && !phone_map[invoice_phone] { phone_map[invoice_phone] = true phone_array = append(phone_array, invoice_phone) } company = strings.TrimSpace(ObjToString(v["company_name"])) if company == "" { company = strings.TrimSpace(ObjToString(v["apply_company"])) } if email == "" { email = strings.TrimSpace(ObjToString(v["mail"])) } if email == "" { email = strings.TrimSpace(ObjToString(v["email"])) } name = strings.TrimSpace(ObjToString(v["apply_realyname"])) } } if phone != "" && (company == "" || name == "") { entniche_info := Mysql_From_Jianyu.SelectBySql(`select name,admin from entniche_info where phone=? order by id desc limit 100`, phone) if entniche_info != nil { for _, v := range *entniche_info { if entniche_info_name := strings.TrimSpace(ObjToString(v["name"])); entniche_info_name != "" && company == "" { company = entniche_info_name } if entniche_info_admin := strings.TrimSpace(ObjToString(v["admin"])); entniche_info_admin != "" && name == "" { name = entniche_info_admin } if company != "" && name != "" { break } } } } // a_collect_phone, _ := m["a_collect_phone"].([]interface{}) for _, v := range a_collect_phone { if vs := strings.TrimSpace(ObjToString(v)); vs != "" && !phone_map[vs] { phone_map[vs] = true phone_array = append(phone_array, vs) } } //注册时候的公司名称 if company == "" { company = strings.TrimSpace(ObjToString(m["s_company"])) } //超级订阅试用 job := "" user_msg, user_msg_ok := Mgo.FindOneByField("user_msg", map[string]interface{}{ "s_userId": _id, }, map[string]interface{}{ "s_company": 1, "s_job": 1, "s_name": 1, "s_phone": 1, }) if user_msg_ok && user_msg != nil { if company == "" { company = strings.TrimSpace(ObjToString((*user_msg)["s_company"])) } if name == "" { name = ObjToString((*user_msg)["s_name"]) } job = ObjToString((*user_msg)["s_job"]) if user_msg_phone := strings.TrimSpace(ObjToString((*user_msg)["s_phone"])); user_msg_phone != "" && !phone_map[user_msg_phone] { phone_map[user_msg_phone] = true phone_array = append(phone_array, user_msg_phone) } } //开通大会员 if company == "" { member, member_ok := Mgo.FindOneByField("member", map[string]interface{}{ "userid": _id, }, map[string]interface{}{ "entname": 1, }) if member_ok && member != nil { company = strings.TrimSpace(ObjToString((*member)["entname"])) } } //大会员试用 saleLeads, saleLeads_ok := Mgo.FindOneByField("saleLeads", map[string]interface{}{ "userid": _id, }, map[string]interface{}{ "company": 1, "position": 1, "name": 1, "phone": 1, }) if saleLeads_ok && saleLeads != nil { if company == "" { company = strings.TrimSpace(ObjToString((*saleLeads)["company"])) } if name == "" { name = ObjToString((*saleLeads)["name"]) } if job == "" { job = ObjToString((*saleLeads)["position"]) } if saleLeads_phone := strings.TrimSpace(ObjToString((*saleLeads)["phone"])); saleLeads_phone != "" && !phone_map[saleLeads_phone] { phone_map[saleLeads_phone] = true phone_array = append(phone_array, saleLeads_phone) } } //以前申请打开微信推送模板消息,目前入口已关闭 if flag { applysub_user, applysub_user_ok := Mgo.FindOneByField("applysub_user", map[string]interface{}{ "s_openid": s_m_openid, }, map[string]interface{}{ "s_company": 1, "s_phone": 1, }) if applysub_user_ok && applysub_user != nil { if company == "" { company = strings.TrimSpace(ObjToString((*applysub_user)["s_company"])) } if applysub_user_phone := strings.TrimSpace(ObjToString((*applysub_user)["s_phone"])); applysub_user_phone != "" && !phone_map[applysub_user_phone] { phone_map[applysub_user_phone] = true phone_array = append(phone_array, applysub_user_phone) } } } //绑定的手机号要放数组第一个,省份、城市从手机号归属地中取 if len(phone_array) > 0 { phoneData, err := phonedata.Find(phone_array[0]) if err == nil { province = phoneData.Province city = phoneData.City } } //关注状态 follow_status := 1 if s_m_openid != "" && IntAllDef(m["i_ispush"], 1) == 0 { follow_status = 2 } //推荐人 channel_id := "" if flag { person_invitelink, person_invitelink_ok := Mgo.FindOneByField("person_invitelink", map[string]interface{}{ "s_target_openid": s_m_openid, "$and": []map[string]interface{}{ map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}}, map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}}, map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}}, }, }, map[string]interface{}{ "s_source_openid": 1, }) if person_invitelink_ok && person_invitelink != nil { channel_id = ObjToString((*person_invitelink)["s_source_openid"]) } if channel_id == "" { person_activelink, person_activelink_ok := Mgo.FindOneByField("person_activelink", map[string]interface{}{ "s_target_openid": s_m_openid, "$and": []map[string]interface{}{ map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}}, map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}}, map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}}, }, }, map[string]interface{}{ "s_source_openid": 1, }) if person_activelink_ok && person_activelink != nil { channel_id = ObjToString((*person_activelink)["s_source_openid"]) } } } return &raw_user{ User_id: _id, Open_id: s_m_openid, Reg_time: FormatDateByInt64(®istedate, Date_Full_Layout), Province: province, City: city, Reg_type: strings.Join(reg_type, "、"), Device: ObjToString(m["s_appponetype"]), Company: company, Job: job, Source_module: ObjToString(m["s_module"]), Source_channel: ObjToString(m["s_rsource"]), Phone: strings.Join(phone_array, ","), Channel_id: channel_id, Email: email, Timestamp: NowFormat(Date_Full_Layout), Name: name, Follow_status: follow_status, } }