|
- package entity
- import (
- "log"
- . "online_datasync/config"
- . "online_datasync/db"
- "online_datasync/phonedata"
- "strings"
- "sync"
- . "app.yhyue.com/moapp/jybase/common"
- . "app.yhyue.com/moapp/jybase/date"
- . "app.yhyue.com/moapp/jybase/mongodb"
- )
- var (
- Raw_user *raw_user
- )
- type raw_user struct {
- User_id string //userid
- Reg_time string //注册日期
- Reg_type string //注册方式[手机、微信]
- Province string //省份
- City string //城市
- Device string //常用设备
- Company string //公司名称
- Job string //职务
- Source_module string //来源模块
- Source_channel string //来源渠道
- Follow_status int //关注状态 1未取关 2取关
- Phone string //手机号
- Open_id string //微信id
- Channel_id string //例如:如果是推荐人记推荐用户id
- Email string //
- Name string //姓名
- Timestamp string //更新时间
- }
- func (r *raw_user) TableName() string {
- return "raw_user"
- }
- //
- func (r *raw_user) SaveFields() []string {
- return []string{"user_id", "reg_time", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "channel_id", "name", "email", "timestamp"}
- }
- //
- func (r *raw_user) selectField() map[string]interface{} {
- return map[string]interface{}{
- "_id": 1,
- "s_phone": 1,
- "s_m_phone": 1,
- "s_m_openid": 1,
- "l_registedate": 1,
- "l_w_registedate": 1,
- "l_a_registedate": 1,
- "a_m_openid": 1,
- "s_province": 1,
- "s_city": 1,
- "s_company": 1,
- "s_appponetype": 1,
- "i_ispush": 1,
- "s_myemail": 1,
- "o_jy.s_email": 1,
- "o_vipjy.s_email": 1,
- "o_member_jy.s_email": 1,
- "s_rsource": 1,
- "s_module": 1,
- "a_collect_phone": 1,
- }
- }
- //
- func (r *raw_user) Run(start_unix, end_unix int64, start_layout, end_layout string) {
- log.Println("开始同步", r.TableName(), "表。。。", start_unix, end_unix)
- r.add()
- if start_unix > 0 {
- r.update(start_unix, end_unix)
- }
- log.Println("同步", r.TableName(), "表结束。。。")
- }
- //新增
- func (r *raw_user) add() (id string) {
- defer Catch()
- index := 0
- array := []interface{}{}
- lock := &sync.Mutex{}
- pool := make(chan bool, Config.SelectMgoUserPool)
- wait := &sync.WaitGroup{}
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "i_appid": 2,
- }
- if TimeTask.User_mgo_mysql_id != "" {
- q["_id"] = map[string]interface{}{
- "$gt": StringTOBsonId(TimeTask.User_mgo_mysql_id),
- }
- }
- log.Println("开始同步新增", r.TableName(), "表。。。", q)
- it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter()
- for temp := make(map[string]interface{}); it.Next(&temp); {
- TimeTask.User_mgo_mysql_id = BsonIdToSId(temp["_id"])
- pool <- true
- wait.Add(1)
- go func(m map[string]interface{}) {
- defer Catch()
- defer func() {
- <-pool
- wait.Done()
- }()
- ru := new_raw_user(m, true)
- lock.Lock()
- defer lock.Unlock()
- index++
- array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Name, ru.Email, ru.Timestamp)
- if index%Config.InsertBathSize == 0 {
- log.Println("同步新增", r.TableName(), "表", index)
- Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
- array = []interface{}{}
- }
- }(temp)
- temp = make(map[string]interface{})
- }
- wait.Wait()
- if len(array) > 0 {
- Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
- array = []interface{}{}
- }
- log.Println("同步新增", r.TableName(), "表结束。。。", index)
- return
- }
- //更新
- func (r *raw_user) update(start_unix, end_unix int64) {
- log.Println("开始同步user表,Mgo to Mysql 更新。。。")
- defer Catch()
- index := 0
- fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "name", "email", "timestamp"}
- array := [][]interface{}{}
- lock := &sync.Mutex{}
- pool := make(chan bool, Config.SelectMgoUserPool)
- wait := &sync.WaitGroup{}
- sess := Mgo.GetMgoConn()
- defer Mgo.DestoryMongoConn(sess)
- q := map[string]interface{}{
- "i_appid": 2,
- "auto_updatetime": map[string]interface{}{
- "$gte": start_unix,
- "$lt": end_unix,
- },
- }
- log.Println("同步更新", r.TableName(), "表。。。", q)
- it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter()
- for temp := make(map[string]interface{}); it.Next(&temp); {
- pool <- true
- wait.Add(1)
- go func(m map[string]interface{}) {
- defer Catch()
- defer func() {
- <-pool
- wait.Done()
- }()
- ru := new_raw_user(m, false)
- lock.Lock()
- defer lock.Unlock()
- index++
- array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Name, ru.Email, ru.Timestamp})
- if index%Config.UpdateBathSize == 0 {
- log.Println("同步更新", r.TableName(), "表", index)
- Mysql_Main.UpdateBath(r.TableName(), fields, array)
- array = [][]interface{}{}
- }
- }(temp)
- temp = make(map[string]interface{})
- }
- wait.Wait()
- if len(array) > 0 {
- Mysql_Main.UpdateBath(r.TableName(), fields, array)
- array = [][]interface{}{}
- }
- log.Println("同步更新", r.TableName(), "表结束。。。", index)
- }
- //
- func new_raw_user(m map[string]interface{}, flag bool) *raw_user {
- _id := BsonIdToSId(m["_id"])
- s_m_openid := ObjToString(m["s_m_openid"])
- registedate := Int64All(m["l_registedate"])
- if registedate == 0 {
- registedate = Int64All(m["l_w_registedate"])
- }
- if registedate == 0 {
- registedate = Int64All(m["l_a_registedate"])
- }
- reg_type := []string{}
- if s_m_openid != "" || ObjToString(m["a_m_openid"]) != "" {
- reg_type = append(reg_type, "微信")
- }
- if ObjToString(m["s_phone"]) != "" {
- reg_type = append(reg_type, "手机")
- }
- province := ObjToString(m["s_province"])
- city := ObjToString(m["s_city"])
- phone := strings.TrimSpace(ObjToString(m["s_phone"]))
- if phone == "" {
- phone = strings.TrimSpace(ObjToString(m["s_m_phone"]))
- }
- phone_map := map[string]bool{}
- phone_array := []string{}
- if phone != "" {
- phone_map[phone] = true
- phone_array = append(phone_array, phone)
- }
- //邮箱
- email := strings.TrimSpace(ObjToString(m["s_myemail"]))
- if email == "" {
- o_jy, _ := m["o_jy"].(map[string]interface{})
- email = strings.TrimSpace(ObjToString(o_jy["s_email"]))
- }
- if email == "" {
- o_vipjy, _ := m["o_vipjy"].(map[string]interface{})
- email = strings.TrimSpace(ObjToString(o_vipjy["s_email"]))
- }
- if email == "" {
- o_member_jy, _ := m["o_member_jy"].(map[string]interface{})
- email = strings.TrimSpace(ObjToString(o_member_jy["s_email"]))
- }
- name := ""
- company := ""
- //手机号 公司名称 邮箱 姓名 从订单、发票中取
- orders := Mysql_From_Jianyu.SelectBySql(`SELECT a.user_phone,b.apply_phone,b.apply_company,b.apply_realyname,b.email,c.phone,c.mail,c.company_name from dataexport_order a
- LEFT JOIN apply_invoice b on (a.user_id=? and a.id=b.order_id)
- LEFT JOIN invoice c on (a.user_id=? and a.order_code=c.order_code)
- where a.user_id=? and (a.user_phone is not null or b.apply_company is not null or c.company_name is not null) order by a.id desc limit 100`, _id, _id, _id)
- if orders != nil {
- for _, v := range *orders {
- if order_phone := strings.TrimSpace(ObjToString(v["user_phone"])); order_phone != "" && !phone_map[order_phone] {
- phone_map[order_phone] = true
- phone_array = append(phone_array, order_phone)
- }
- if apply_invoice_phone := strings.TrimSpace(ObjToString(v["apply_phone"])); apply_invoice_phone != "" && !phone_map[apply_invoice_phone] {
- phone_map[apply_invoice_phone] = true
- phone_array = append(phone_array, apply_invoice_phone)
- }
- if invoice_phone := strings.TrimSpace(ObjToString(v["phone"])); invoice_phone != "" && !phone_map[invoice_phone] {
- phone_map[invoice_phone] = true
- phone_array = append(phone_array, invoice_phone)
- }
- company = strings.TrimSpace(ObjToString(v["company_name"]))
- if company == "" {
- company = strings.TrimSpace(ObjToString(v["apply_company"]))
- }
- if email == "" {
- email = strings.TrimSpace(ObjToString(v["mail"]))
- }
- if email == "" {
- email = strings.TrimSpace(ObjToString(v["email"]))
- }
- name = strings.TrimSpace(ObjToString(v["apply_realyname"]))
- }
- }
- if phone != "" && (company == "" || name == "") {
- entniche_info := Mysql_From_Jianyu.SelectBySql(`select name,admin from entniche_info where phone=? order by id desc limit 100`, phone)
- if entniche_info != nil {
- for _, v := range *entniche_info {
- if entniche_info_name := strings.TrimSpace(ObjToString(v["name"])); entniche_info_name != "" && company == "" {
- company = entniche_info_name
- }
- if entniche_info_admin := strings.TrimSpace(ObjToString(v["admin"])); entniche_info_admin != "" && name == "" {
- name = entniche_info_admin
- }
- if company != "" && name != "" {
- break
- }
- }
- }
- }
- //
- a_collect_phone, _ := m["a_collect_phone"].([]interface{})
- for _, v := range a_collect_phone {
- if vs := strings.TrimSpace(ObjToString(v)); vs != "" && !phone_map[vs] {
- phone_map[vs] = true
- phone_array = append(phone_array, vs)
- }
- }
- //注册时候的公司名称
- if company == "" {
- company = strings.TrimSpace(ObjToString(m["s_company"]))
- }
- //超级订阅试用
- job := ""
- user_msg, user_msg_ok := Mgo.FindOneByField("user_msg", map[string]interface{}{
- "s_userId": _id,
- }, map[string]interface{}{
- "s_company": 1,
- "s_job": 1,
- "s_name": 1,
- "s_phone": 1,
- })
- if user_msg_ok && user_msg != nil {
- if company == "" {
- company = strings.TrimSpace(ObjToString((*user_msg)["s_company"]))
- }
- if name == "" {
- name = ObjToString((*user_msg)["s_name"])
- }
- job = ObjToString((*user_msg)["s_job"])
- if user_msg_phone := strings.TrimSpace(ObjToString((*user_msg)["s_phone"])); user_msg_phone != "" && !phone_map[user_msg_phone] {
- phone_map[user_msg_phone] = true
- phone_array = append(phone_array, user_msg_phone)
- }
- }
- //开通大会员
- if company == "" {
- member, member_ok := Mgo.FindOneByField("member", map[string]interface{}{
- "userid": _id,
- }, map[string]interface{}{
- "entname": 1,
- })
- if member_ok && member != nil {
- company = strings.TrimSpace(ObjToString((*member)["entname"]))
- }
- }
- //大会员试用
- saleLeads, saleLeads_ok := Mgo.FindOneByField("saleLeads", map[string]interface{}{
- "userid": _id,
- }, map[string]interface{}{
- "company": 1,
- "position": 1,
- "name": 1,
- "phone": 1,
- })
- if saleLeads_ok && saleLeads != nil {
- if company == "" {
- company = strings.TrimSpace(ObjToString((*saleLeads)["company"]))
- }
- if name == "" {
- name = ObjToString((*saleLeads)["name"])
- }
- if job == "" {
- job = ObjToString((*saleLeads)["position"])
- }
- if saleLeads_phone := strings.TrimSpace(ObjToString((*saleLeads)["phone"])); saleLeads_phone != "" && !phone_map[saleLeads_phone] {
- phone_map[saleLeads_phone] = true
- phone_array = append(phone_array, saleLeads_phone)
- }
- }
- //以前申请打开微信推送模板消息,目前入口已关闭
- if flag {
- applysub_user, applysub_user_ok := Mgo.FindOneByField("applysub_user", map[string]interface{}{
- "s_openid": s_m_openid,
- }, map[string]interface{}{
- "s_company": 1,
- "s_phone": 1,
- })
- if applysub_user_ok && applysub_user != nil {
- if company == "" {
- company = strings.TrimSpace(ObjToString((*applysub_user)["s_company"]))
- }
- if applysub_user_phone := strings.TrimSpace(ObjToString((*applysub_user)["s_phone"])); applysub_user_phone != "" && !phone_map[applysub_user_phone] {
- phone_map[applysub_user_phone] = true
- phone_array = append(phone_array, applysub_user_phone)
- }
- }
- }
- //绑定的手机号要放数组第一个,省份、城市从手机号归属地中取
- if len(phone_array) > 0 {
- phoneData, err := phonedata.Find(phone_array[0])
- if err == nil {
- province = phoneData.Province
- city = phoneData.City
- }
- }
- //关注状态
- follow_status := 1
- if s_m_openid != "" && IntAllDef(m["i_ispush"], 1) == 0 {
- follow_status = 2
- }
- //推荐人
- channel_id := ""
- if flag {
- person_invitelink, person_invitelink_ok := Mgo.FindOneByField("person_invitelink", map[string]interface{}{
- "s_target_openid": s_m_openid,
- "$and": []map[string]interface{}{
- map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}},
- map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}},
- map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}},
- },
- }, map[string]interface{}{
- "s_source_openid": 1,
- })
- if person_invitelink_ok && person_invitelink != nil {
- channel_id = ObjToString((*person_invitelink)["s_source_openid"])
- }
- if channel_id == "" {
- person_activelink, person_activelink_ok := Mgo.FindOneByField("person_activelink", map[string]interface{}{
- "s_target_openid": s_m_openid,
- "$and": []map[string]interface{}{
- map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}},
- map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}},
- map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}},
- },
- }, map[string]interface{}{
- "s_source_openid": 1,
- })
- if person_activelink_ok && person_activelink != nil {
- channel_id = ObjToString((*person_activelink)["s_source_openid"])
- }
- }
- }
- return &raw_user{
- User_id: _id,
- Open_id: s_m_openid,
- Reg_time: FormatDateByInt64(®istedate, Date_Full_Layout),
- Province: province,
- City: city,
- Reg_type: strings.Join(reg_type, "、"),
- Device: ObjToString(m["s_appponetype"]),
- Company: company,
- Job: job,
- Source_module: ObjToString(m["s_module"]),
- Source_channel: ObjToString(m["s_rsource"]),
- Phone: strings.Join(phone_array, ","),
- Channel_id: channel_id,
- Email: email,
- Timestamp: NowFormat(Date_Full_Layout),
- Name: name,
- Follow_status: follow_status,
- }
- }
|