raw_user.go 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. package entity
  2. import (
  3. "log"
  4. . "online_datasync/config"
  5. . "online_datasync/db"
  6. "online_datasync/phonedata"
  7. "strings"
  8. "sync"
  9. . "app.yhyue.com/moapp/jybase/common"
  10. . "app.yhyue.com/moapp/jybase/date"
  11. . "app.yhyue.com/moapp/jybase/mongodb"
  12. )
  13. var (
  14. Raw_user *raw_user
  15. )
  16. type raw_user struct {
  17. User_id string //userid
  18. Reg_time string //注册日期
  19. Reg_type string //注册方式[手机、微信]
  20. Province string //省份
  21. City string //城市
  22. Device string //常用设备
  23. Company string //公司名称
  24. Job string //职务
  25. Source_module string //来源模块
  26. Source_channel string //来源渠道
  27. Follow_status int //关注状态 1未取关 2取关
  28. Phone string //手机号
  29. Open_id string //微信id
  30. Channel_id string //例如:如果是推荐人记推荐用户id
  31. Email string //
  32. Timestamp string //更新时间
  33. }
  34. func (r *raw_user) TableName() string {
  35. return "raw_user"
  36. }
  37. //
  38. func (r *raw_user) SaveFields() []string {
  39. return []string{"user_id", "reg_time", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "channel_id", "email", "timestamp"}
  40. }
  41. //
  42. func (r *raw_user) selectField() map[string]interface{} {
  43. return map[string]interface{}{
  44. "_id": 1,
  45. "s_phone": 1,
  46. "s_m_phone": 1,
  47. "s_m_openid": 1,
  48. "l_registedate": 1,
  49. "l_w_registedate": 1,
  50. "l_a_registedate": 1,
  51. "a_m_openid": 1,
  52. "s_province": 1,
  53. "s_city": 1,
  54. "s_company": 1,
  55. "s_appponetype": 1,
  56. "i_ispush": 1,
  57. "s_myemail": 1,
  58. "o_jy.s_email": 1,
  59. "o_vipjy.s_email": 1,
  60. "o_member_jy.s_email": 1,
  61. "s_rsource": 1,
  62. "s_module": 1,
  63. }
  64. }
  65. //
  66. func (r *raw_user) Run(start_unix, end_unix int64, start_layout, end_layout string) {
  67. log.Println("开始同步", r.TableName(), "表。。。", start_unix, end_unix)
  68. r.add()
  69. if start_unix > 0 {
  70. r.update(start_unix, end_unix)
  71. }
  72. log.Println("同步", r.TableName(), "表结束。。。")
  73. }
  74. //新增
  75. func (r *raw_user) add() (id string) {
  76. defer Catch()
  77. index := 0
  78. array := []interface{}{}
  79. lock := &sync.Mutex{}
  80. pool := make(chan bool, Config.SelectMgoUserPool)
  81. wait := &sync.WaitGroup{}
  82. sess := Mgo.GetMgoConn()
  83. defer Mgo.DestoryMongoConn(sess)
  84. q := map[string]interface{}{
  85. "i_appid": 2,
  86. }
  87. if TimeTask.User_mgo_mysql_id != "" {
  88. q["_id"] = map[string]interface{}{
  89. "$gt": StringTOBsonId(TimeTask.User_mgo_mysql_id),
  90. }
  91. }
  92. log.Println("开始同步新增", r.TableName(), "表。。。", q)
  93. it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter()
  94. for temp := make(map[string]interface{}); it.Next(&temp); {
  95. TimeTask.User_mgo_mysql_id = BsonIdToSId(temp["_id"])
  96. pool <- true
  97. wait.Add(1)
  98. go func(m map[string]interface{}) {
  99. defer Catch()
  100. defer func() {
  101. <-pool
  102. wait.Done()
  103. }()
  104. ru := new_raw_user(m, true)
  105. lock.Lock()
  106. defer lock.Unlock()
  107. index++
  108. array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Email, ru.Timestamp)
  109. if index%Config.InsertBathSize == 0 {
  110. log.Println("同步新增", r.TableName(), "表", index)
  111. Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
  112. array = []interface{}{}
  113. }
  114. }(temp)
  115. temp = make(map[string]interface{})
  116. }
  117. wait.Wait()
  118. if len(array) > 0 {
  119. Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
  120. array = []interface{}{}
  121. }
  122. log.Println("同步新增", r.TableName(), "表结束。。。", index)
  123. return
  124. }
  125. //更新
  126. func (r *raw_user) update(start_unix, end_unix int64) {
  127. log.Println("开始同步user表,Mgo to Mysql 更新。。。")
  128. defer Catch()
  129. index := 0
  130. fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "email", "timestamp"}
  131. array := [][]interface{}{}
  132. lock := &sync.Mutex{}
  133. pool := make(chan bool, Config.SelectMgoUserPool)
  134. wait := &sync.WaitGroup{}
  135. sess := Mgo.GetMgoConn()
  136. defer Mgo.DestoryMongoConn(sess)
  137. q := map[string]interface{}{
  138. "i_appid": 2,
  139. "auto_updatetime": map[string]interface{}{
  140. "$gte": start_unix,
  141. "$lt": end_unix,
  142. },
  143. }
  144. log.Println("同步更新", r.TableName(), "表。。。", q)
  145. it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter()
  146. for temp := make(map[string]interface{}); it.Next(&temp); {
  147. pool <- true
  148. wait.Add(1)
  149. go func(m map[string]interface{}) {
  150. defer Catch()
  151. defer func() {
  152. <-pool
  153. wait.Done()
  154. }()
  155. ru := new_raw_user(m, false)
  156. lock.Lock()
  157. defer lock.Unlock()
  158. index++
  159. array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Email, ru.Timestamp})
  160. if index%Config.UpdateBathSize == 0 {
  161. log.Println("同步更新", r.TableName(), "表", index)
  162. Mysql_Main.UpdateBath(r.TableName(), fields, array)
  163. array = [][]interface{}{}
  164. }
  165. }(temp)
  166. temp = make(map[string]interface{})
  167. }
  168. wait.Wait()
  169. if len(array) > 0 {
  170. Mysql_Main.UpdateBath(r.TableName(), fields, array)
  171. array = [][]interface{}{}
  172. }
  173. log.Println("同步更新", r.TableName(), "表结束。。。", index)
  174. }
  175. //
  176. func new_raw_user(m map[string]interface{}, flag bool) *raw_user {
  177. _id := BsonIdToSId(m["_id"])
  178. s_m_openid := ObjToString(m["s_m_openid"])
  179. phone := ObjToString(m["s_phone"])
  180. if phone == "" {
  181. phone = ObjToString(m["s_m_phone"])
  182. }
  183. registedate := Int64All(m["l_registedate"])
  184. if registedate == 0 {
  185. registedate = Int64All(m["l_w_registedate"])
  186. }
  187. if registedate == 0 {
  188. registedate = Int64All(m["l_a_registedate"])
  189. }
  190. reg_type := []string{}
  191. if s_m_openid != "" || ObjToString(m["a_m_openid"]) != "" {
  192. reg_type = append(reg_type, "微信")
  193. }
  194. if ObjToString(m["s_phone"]) != "" {
  195. reg_type = append(reg_type, "手机")
  196. }
  197. province := ObjToString(m["s_province"])
  198. city := ObjToString(m["s_city"])
  199. if phone != "" {
  200. phoneData, err := phonedata.Find(phone)
  201. if err == nil {
  202. province = phoneData.Province
  203. city = phoneData.City
  204. }
  205. }
  206. company := ObjToString(m["s_company"])
  207. job := ""
  208. user_msg, user_msg_ok := Mgo.FindOneByField("user_msg", map[string]interface{}{
  209. "s_userId": _id,
  210. }, map[string]interface{}{
  211. "s_company": 1,
  212. "s_job": 1,
  213. })
  214. if user_msg_ok && user_msg != nil {
  215. if company == "" {
  216. company = ObjToString((*user_msg)["s_company"])
  217. }
  218. job = ObjToString((*user_msg)["s_job"])
  219. }
  220. if company == "" {
  221. member, member_ok := Mgo.FindOneByField("member", map[string]interface{}{
  222. "userid": _id,
  223. }, map[string]interface{}{
  224. "entname": 1,
  225. })
  226. if member_ok && member != nil {
  227. company = ObjToString((*member)["company"])
  228. }
  229. }
  230. if company == "" {
  231. applysub_user, applysub_user_ok := Mgo.FindOneByField("applysub_user", map[string]interface{}{
  232. "s_openid": s_m_openid,
  233. }, map[string]interface{}{
  234. "s_company": 1,
  235. })
  236. if applysub_user_ok && applysub_user != nil {
  237. company = ObjToString((*applysub_user)["s_company"])
  238. }
  239. }
  240. follow_status := 1
  241. if s_m_openid != "" && IntAllDef(m["i_ispush"], 1) == 0 {
  242. follow_status = 2
  243. }
  244. channel_id := ""
  245. if flag {
  246. person_invitelink, person_invitelink_ok := Mgo.FindOneByField("person_invitelink", map[string]interface{}{
  247. "s_target_openid": s_m_openid,
  248. "$and": []map[string]interface{}{
  249. map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}},
  250. map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}},
  251. map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}},
  252. },
  253. }, map[string]interface{}{
  254. "s_source_openid": 1,
  255. })
  256. if person_invitelink_ok && person_invitelink != nil {
  257. channel_id = ObjToString((*person_invitelink)["s_source_openid"])
  258. }
  259. if channel_id == "" {
  260. person_activelink, person_activelink_ok := Mgo.FindOneByField("person_activelink", map[string]interface{}{
  261. "s_target_openid": s_m_openid,
  262. "$and": []map[string]interface{}{
  263. map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}},
  264. map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}},
  265. map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}},
  266. },
  267. }, map[string]interface{}{
  268. "s_source_openid": 1,
  269. })
  270. if person_activelink_ok && person_activelink != nil {
  271. channel_id = ObjToString((*person_activelink)["s_source_openid"])
  272. }
  273. }
  274. }
  275. email := ObjToString(m["s_myemail"])
  276. if email == "" {
  277. o_jy, _ := m["o_jy"].(map[string]interface{})
  278. email = ObjToString(o_jy["s_email"])
  279. }
  280. if email == "" {
  281. o_vipjy, _ := m["o_vipjy"].(map[string]interface{})
  282. email = ObjToString(o_vipjy["s_email"])
  283. }
  284. if email == "" {
  285. o_member_jy, _ := m["o_member_jy"].(map[string]interface{})
  286. email = ObjToString(o_member_jy["s_email"])
  287. }
  288. return &raw_user{
  289. User_id: _id,
  290. Open_id: s_m_openid,
  291. Reg_time: FormatDateByInt64(&registedate, Date_Full_Layout),
  292. Province: province,
  293. City: city,
  294. Reg_type: strings.Join(reg_type, "、"),
  295. Device: ObjToString(m["s_appponetype"]),
  296. Company: company,
  297. Job: job,
  298. Source_module: ObjToString(m["s_module"]),
  299. Source_channel: ObjToString(m["s_rsource"]),
  300. Phone: phone,
  301. Channel_id: channel_id,
  302. Email: email,
  303. Timestamp: NowFormat(Date_Full_Layout),
  304. Follow_status: follow_status,
  305. }
  306. }