bxt_raw_user .go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package entity
  2. import (
  3. "log"
  4. . "online_datasync/config"
  5. . "online_datasync/db"
  6. "strings"
  7. "sync"
  8. . "app.yhyue.com/moapp/jybase/common"
  9. . "app.yhyue.com/moapp/jybase/date"
  10. . "app.yhyue.com/moapp/jybase/mongodb"
  11. )
  12. var (
  13. Bxt_raw_user *bxt_raw_user
  14. )
  15. type bxt_raw_user struct {
  16. User_id string //userid
  17. Reg_time string //注册日期
  18. Reg_type string //注册方式[手机、微信]
  19. Province string //省份
  20. City string //城市
  21. Device string //常用设备
  22. Company string //公司名称
  23. Job string //职务
  24. Source_module string //来源模块
  25. Source_channel string //来源渠道
  26. Follow_status int //关注状态 1未取关 2取关
  27. Phone string //手机号
  28. Open_id string //微信id
  29. Channel_id string //例如:如果是推荐人记推荐用户id
  30. Email string //
  31. Name string //姓名
  32. Timestamp string //更新时间
  33. }
  34. func (b *bxt_raw_user) TableName() string {
  35. return "raw_user"
  36. }
  37. //
  38. func (b *bxt_raw_user) SaveFields() []string {
  39. return []string{"user_id", "reg_time", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "channel_id", "name", "email", "timestamp"}
  40. }
  41. //
  42. func (b *bxt_raw_user) selectField() map[string]interface{} {
  43. return map[string]interface{}{
  44. "_id": 1,
  45. "s_phone": 1,
  46. "l_registedate": 1,
  47. "s_company": 1,
  48. "o_member_jy.s_email": 1,
  49. }
  50. }
  51. //
  52. func (b *bxt_raw_user) Run(start_unix, end_unix int64, start_layout, end_layout string) {
  53. log.Println("开始同步标讯通", b.TableName(), "表。。。", start_unix, end_unix)
  54. b.add()
  55. if start_unix > 0 {
  56. b.update(start_unix, end_unix)
  57. }
  58. log.Println("同步讯通", b.TableName(), "表结束。。。")
  59. }
  60. //新增
  61. func (b *bxt_raw_user) add() (id string) {
  62. defer Catch()
  63. index := 0
  64. array := []interface{}{}
  65. lock := &sync.Mutex{}
  66. pool := make(chan bool, Config.SelectMgoUserPool)
  67. wait := &sync.WaitGroup{}
  68. sess := Mgo.GetMgoConn()
  69. defer Mgo.DestoryMongoConn(sess)
  70. q := map[string]interface{}{
  71. "i_appid": 2,
  72. }
  73. if TimeTask.Bxt_user_mgo_mysql_id != "" {
  74. q["_id"] = map[string]interface{}{
  75. "$gt": StringTOBsonId(TimeTask.Bxt_user_mgo_mysql_id),
  76. }
  77. }
  78. log.Println("开始同步新增", b.TableName(), "表。。。", q)
  79. it := sess.DB("bxt").C("user").Find(q).Sort("_id").Select(b.selectField()).Iter()
  80. for temp := make(map[string]interface{}); it.Next(&temp); {
  81. TimeTask.Bxt_user_mgo_mysql_id = BsonIdToSId(temp["_id"])
  82. pool <- true
  83. wait.Add(1)
  84. go func(m map[string]interface{}) {
  85. defer Catch()
  86. defer func() {
  87. <-pool
  88. wait.Done()
  89. }()
  90. ru := new_bxt_raw_user(m, true)
  91. lock.Lock()
  92. defer lock.Unlock()
  93. index++
  94. array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Name, ru.Email, ru.Timestamp)
  95. if index%Config.InsertBathSize == 0 {
  96. log.Println("同步新增", b.TableName(), "表", index)
  97. Mysql_Main_Bxt.InsertIgnoreBatch(b.TableName(), b.SaveFields(), array)
  98. array = []interface{}{}
  99. }
  100. }(temp)
  101. temp = make(map[string]interface{})
  102. }
  103. wait.Wait()
  104. if len(array) > 0 {
  105. Mysql_Main_Bxt.InsertIgnoreBatch(b.TableName(), b.SaveFields(), array)
  106. array = []interface{}{}
  107. }
  108. log.Println("同步新增", b.TableName(), "表结束。。。", index)
  109. return
  110. }
  111. //更新
  112. func (b *bxt_raw_user) update(start_unix, end_unix int64) {
  113. log.Println("开始同步user表,Mgo to Mysql 更新。。。")
  114. defer Catch()
  115. index := 0
  116. fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "name", "email", "timestamp"}
  117. array := [][]interface{}{}
  118. lock := &sync.Mutex{}
  119. pool := make(chan bool, Config.SelectMgoUserPool)
  120. wait := &sync.WaitGroup{}
  121. sess := Mgo.GetMgoConn()
  122. defer Mgo.DestoryMongoConn(sess)
  123. q := map[string]interface{}{
  124. "i_appid": 2,
  125. "auto_updatetime": map[string]interface{}{
  126. "$gte": start_unix,
  127. "$lt": end_unix,
  128. },
  129. }
  130. log.Println("同步更新", b.TableName(), "表。。。", q)
  131. it := sess.DB("bxt").C("user").Find(q).Sort("_id").Select(b.selectField()).Iter()
  132. for temp := make(map[string]interface{}); it.Next(&temp); {
  133. pool <- true
  134. wait.Add(1)
  135. go func(m map[string]interface{}) {
  136. defer Catch()
  137. defer func() {
  138. <-pool
  139. wait.Done()
  140. }()
  141. ru := new_bxt_raw_user(m, false)
  142. lock.Lock()
  143. defer lock.Unlock()
  144. index++
  145. array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Name, ru.Email, ru.Timestamp})
  146. if index%Config.UpdateBathSize == 0 {
  147. log.Println("同步更新", b.TableName(), "表", index)
  148. Mysql_Main_Bxt.UpdateBath(b.TableName(), fields, array)
  149. array = [][]interface{}{}
  150. }
  151. }(temp)
  152. temp = make(map[string]interface{})
  153. }
  154. wait.Wait()
  155. if len(array) > 0 {
  156. Mysql_Main_Bxt.UpdateBath(b.TableName(), fields, array)
  157. array = [][]interface{}{}
  158. }
  159. log.Println("同步更新", b.TableName(), "表结束。。。", index)
  160. }
  161. //
  162. func new_bxt_raw_user(m map[string]interface{}, flag bool) *bxt_raw_user {
  163. registedate := Int64All(m["l_registedate"])
  164. //邮箱
  165. email := strings.TrimSpace(ObjToString(m["s_myemail"]))
  166. if email == "" {
  167. o_member_jy, _ := m["o_member_jy"].(map[string]interface{})
  168. email = strings.TrimSpace(ObjToString(o_member_jy["s_email"]))
  169. }
  170. return &bxt_raw_user{
  171. User_id: BsonIdToSId(m["_id"]),
  172. Reg_time: FormatDateByInt64(&registedate, Date_Full_Layout),
  173. Reg_type: "手机",
  174. Company: strings.TrimSpace(ObjToString(m["s_company"])),
  175. Source_module: ObjToString(m["s_module"]),
  176. Phone: strings.TrimSpace(ObjToString(m["s_phone"])),
  177. Email: email,
  178. Timestamp: NowFormat(Date_Full_Layout),
  179. Name: strings.TrimSpace(ObjToString(m["s_name"])),
  180. }
  181. }