raw_user.go 14 KB


  1. package entity
  2. import (
  3. "log"
  4. . "online_datasync/config"
  5. . "online_datasync/db"
  6. "online_datasync/phonedata"
  7. "strings"
  8. "sync"
  9. . "app.yhyue.com/moapp/jybase/common"
  10. . "app.yhyue.com/moapp/jybase/date"
  11. . "app.yhyue.com/moapp/jybase/mongodb"
  12. )
  13. var (
  14. Raw_user *raw_user
  15. )
  16. type raw_user struct {
  17. User_id string //userid
  18. Reg_time string //注册日期
  19. Reg_type string //注册方式[手机、微信]
  20. Province string //省份
  21. City string //城市
  22. Device string //常用设备
  23. Company string //公司名称
  24. Job string //职务
  25. Source_module string //来源模块
  26. Source_channel string //来源渠道
  27. Follow_status int //关注状态 1未取关 2取关
  28. Phone string //手机号
  29. Open_id string //微信id
  30. Channel_id string //例如:如果是推荐人记推荐用户id
  31. Email string //
  32. Name string //姓名
  33. Timestamp string //更新时间
  34. }
  35. func (r *raw_user) TableName() string {
  36. return "raw_user"
  37. }
  38. //
  39. func (r *raw_user) SaveFields() []string {
  40. return []string{"user_id", "reg_time", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "channel_id", "name", "email", "timestamp"}
  41. }
  42. //
  43. func (r *raw_user) selectField() map[string]interface{} {
  44. return map[string]interface{}{
  45. "_id": 1,
  46. "s_phone": 1,
  47. "s_m_phone": 1,
  48. "s_m_openid": 1,
  49. "l_registedate": 1,
  50. "l_w_registedate": 1,
  51. "l_a_registedate": 1,
  52. "a_m_openid": 1,
  53. "s_province": 1,
  54. "s_city": 1,
  55. "s_company": 1,
  56. "s_appponetype": 1,
  57. "i_ispush": 1,
  58. "s_myemail": 1,
  59. "o_jy.s_email": 1,
  60. "o_vipjy.s_email": 1,
  61. "o_member_jy.s_email": 1,
  62. "s_rsource": 1,
  63. "s_module": 1,
  64. "a_collect_phone": 1,
  65. }
  66. }
  67. //
  68. func (r *raw_user) Run(start_unix, end_unix int64, start_layout, end_layout string) {
  69. log.Println("开始同步", r.TableName(), "表。。。", start_unix, end_unix)
  70. r.add()
  71. if start_unix > 0 {
  72. r.update(start_unix, end_unix)
  73. }
  74. log.Println("同步", r.TableName(), "表结束。。。")
  75. }
  76. //新增
  77. func (r *raw_user) add() (id string) {
  78. defer Catch()
  79. index := 0
  80. array := []interface{}{}
  81. lock := &sync.Mutex{}
  82. pool := make(chan bool, Config.SelectMgoUserPool)
  83. wait := &sync.WaitGroup{}
  84. sess := Mgo.GetMgoConn()
  85. defer Mgo.DestoryMongoConn(sess)
  86. q := map[string]interface{}{
  87. "i_appid": 2,
  88. }
  89. if TimeTask.User_mgo_mysql_id != "" {
  90. q["_id"] = map[string]interface{}{
  91. "$gt": StringTOBsonId(TimeTask.User_mgo_mysql_id),
  92. }
  93. }
  94. log.Println("开始同步新增", r.TableName(), "表。。。", q)
  95. it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter()
  96. for temp := make(map[string]interface{}); it.Next(&temp); {
  97. TimeTask.User_mgo_mysql_id = BsonIdToSId(temp["_id"])
  98. pool <- true
  99. wait.Add(1)
  100. go func(m map[string]interface{}) {
  101. defer Catch()
  102. defer func() {
  103. <-pool
  104. wait.Done()
  105. }()
  106. ru := new_raw_user(m, true)
  107. lock.Lock()
  108. defer lock.Unlock()
  109. index++
  110. array = append(array, ru.User_id, ru.Reg_time, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Channel_id, ru.Name, ru.Email, ru.Timestamp)
  111. if index%Config.InsertBathSize == 0 {
  112. log.Println("同步新增", r.TableName(), "表", index)
  113. Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
  114. array = []interface{}{}
  115. }
  116. }(temp)
  117. temp = make(map[string]interface{})
  118. }
  119. wait.Wait()
  120. if len(array) > 0 {
  121. Mysql_Main.InsertIgnoreBatch(r.TableName(), r.SaveFields(), array)
  122. array = []interface{}{}
  123. }
  124. log.Println("同步新增", r.TableName(), "表结束。。。", index)
  125. return
  126. }
  127. //更新
  128. func (r *raw_user) update(start_unix, end_unix int64) {
  129. log.Println("开始同步user表,Mgo to Mysql 更新。。。")
  130. defer Catch()
  131. index := 0
  132. fields := []string{"user_id", "province", "city", "reg_type", "device", "company", "job", "source_module", "source_channel", "follow_status", "phone", "openid", "name", "email", "timestamp"}
  133. array := [][]interface{}{}
  134. lock := &sync.Mutex{}
  135. pool := make(chan bool, Config.SelectMgoUserPool)
  136. wait := &sync.WaitGroup{}
  137. sess := Mgo.GetMgoConn()
  138. defer Mgo.DestoryMongoConn(sess)
  139. q := map[string]interface{}{
  140. "i_appid": 2,
  141. "auto_updatetime": map[string]interface{}{
  142. "$gte": start_unix,
  143. "$lt": end_unix,
  144. },
  145. }
  146. log.Println("同步更新", r.TableName(), "表。。。", q)
  147. it := sess.DB("qfw").C("user").Find(q).Sort("_id").Select(r.selectField()).Iter()
  148. for temp := make(map[string]interface{}); it.Next(&temp); {
  149. pool <- true
  150. wait.Add(1)
  151. go func(m map[string]interface{}) {
  152. defer Catch()
  153. defer func() {
  154. <-pool
  155. wait.Done()
  156. }()
  157. ru := new_raw_user(m, false)
  158. lock.Lock()
  159. defer lock.Unlock()
  160. index++
  161. array = append(array, []interface{}{ru.User_id, ru.Province, ru.City, ru.Reg_type, ru.Device, ru.Company, ru.Job, ru.Source_module, ru.Source_channel, ru.Follow_status, ru.Phone, ru.Open_id, ru.Name, ru.Email, ru.Timestamp})
  162. if index%Config.UpdateBathSize == 0 {
  163. log.Println("同步更新", r.TableName(), "表", index)
  164. Mysql_Main.UpdateBath(r.TableName(), fields, array)
  165. array = [][]interface{}{}
  166. }
  167. }(temp)
  168. temp = make(map[string]interface{})
  169. }
  170. wait.Wait()
  171. if len(array) > 0 {
  172. Mysql_Main.UpdateBath(r.TableName(), fields, array)
  173. array = [][]interface{}{}
  174. }
  175. log.Println("同步更新", r.TableName(), "表结束。。。", index)
  176. }
  177. //
  178. func new_raw_user(m map[string]interface{}, flag bool) *raw_user {
  179. _id := BsonIdToSId(m["_id"])
  180. s_m_openid := ObjToString(m["s_m_openid"])
  181. registedate := Int64All(m["l_registedate"])
  182. if registedate == 0 {
  183. registedate = Int64All(m["l_w_registedate"])
  184. }
  185. if registedate == 0 {
  186. registedate = Int64All(m["l_a_registedate"])
  187. }
  188. reg_type := []string{}
  189. if s_m_openid != "" || ObjToString(m["a_m_openid"]) != "" {
  190. reg_type = append(reg_type, "微信")
  191. }
  192. if ObjToString(m["s_phone"]) != "" {
  193. reg_type = append(reg_type, "手机")
  194. }
  195. province := ObjToString(m["s_province"])
  196. city := ObjToString(m["s_city"])
  197. phone := strings.TrimSpace(ObjToString(m["s_phone"]))
  198. if phone == "" {
  199. phone = strings.TrimSpace(ObjToString(m["s_m_phone"]))
  200. }
  201. phone_map := map[string]bool{}
  202. phone_array := []string{}
  203. if phone != "" {
  204. phone_map[phone] = true
  205. phone_array = append(phone_array, phone)
  206. }
  207. //邮箱
  208. email := strings.TrimSpace(ObjToString(m["s_myemail"]))
  209. if email == "" {
  210. o_jy, _ := m["o_jy"].(map[string]interface{})
  211. email = strings.TrimSpace(ObjToString(o_jy["s_email"]))
  212. }
  213. if email == "" {
  214. o_vipjy, _ := m["o_vipjy"].(map[string]interface{})
  215. email = strings.TrimSpace(ObjToString(o_vipjy["s_email"]))
  216. }
  217. if email == "" {
  218. o_member_jy, _ := m["o_member_jy"].(map[string]interface{})
  219. email = strings.TrimSpace(ObjToString(o_member_jy["s_email"]))
  220. }
  221. name := ""
  222. company := ""
  223. //手机号 公司名称 邮箱 姓名 从订单、发票中取
  224. orders := Mysql_From_Jianyu.SelectBySql(`SELECT a.user_phone,b.apply_phone,b.apply_company,b.apply_realyname,b.email,c.phone,c.mail,c.company_name from dataexport_order a
  225. LEFT JOIN apply_invoice b on (a.user_id=? and a.id=b.order_id)
  226. LEFT JOIN invoice c on (a.user_id=? and a.order_code=c.order_code)
  227. where a.user_id=? and (a.user_phone is not null or b.apply_company is not null or c.company_name is not null) order by a.id desc limit 100`, _id, _id, _id)
  228. if orders != nil {
  229. for _, v := range *orders {
  230. if order_phone := strings.TrimSpace(ObjToString(v["user_phone"])); order_phone != "" && !phone_map[order_phone] {
  231. phone_map[order_phone] = true
  232. phone_array = append(phone_array, order_phone)
  233. }
  234. if apply_invoice_phone := strings.TrimSpace(ObjToString(v["apply_phone"])); apply_invoice_phone != "" && !phone_map[apply_invoice_phone] {
  235. phone_map[apply_invoice_phone] = true
  236. phone_array = append(phone_array, apply_invoice_phone)
  237. }
  238. if invoice_phone := strings.TrimSpace(ObjToString(v["phone"])); invoice_phone != "" && !phone_map[invoice_phone] {
  239. phone_map[invoice_phone] = true
  240. phone_array = append(phone_array, invoice_phone)
  241. }
  242. company = strings.TrimSpace(ObjToString(v["company_name"]))
  243. if company == "" {
  244. company = strings.TrimSpace(ObjToString(v["apply_company"]))
  245. }
  246. if email == "" {
  247. email = strings.TrimSpace(ObjToString(v["mail"]))
  248. }
  249. if email == "" {
  250. email = strings.TrimSpace(ObjToString(v["email"]))
  251. }
  252. name = strings.TrimSpace(ObjToString(v["apply_realyname"]))
  253. }
  254. }
  255. if phone != "" && (company == "" || name == "") {
  256. entniche_info := Mysql_From_Jianyu.SelectBySql(`select name,admin from entniche_info where phone=? order by id desc limit 100`, phone)
  257. if entniche_info != nil {
  258. for _, v := range *entniche_info {
  259. if entniche_info_name := strings.TrimSpace(ObjToString(v["name"])); entniche_info_name != "" && company == "" {
  260. company = entniche_info_name
  261. }
  262. if entniche_info_admin := strings.TrimSpace(ObjToString(v["admin"])); entniche_info_admin != "" && name == "" {
  263. name = entniche_info_admin
  264. }
  265. if company != "" && name != "" {
  266. break
  267. }
  268. }
  269. }
  270. }
  271. //
  272. a_collect_phone, _ := m["a_collect_phone"].([]interface{})
  273. for _, v := range a_collect_phone {
  274. if vs := strings.TrimSpace(ObjToString(v)); vs != "" && !phone_map[vs] {
  275. phone_map[vs] = true
  276. phone_array = append(phone_array, vs)
  277. }
  278. }
  279. //注册时候的公司名称
  280. if company == "" {
  281. company = strings.TrimSpace(ObjToString(m["s_company"]))
  282. }
  283. //超级订阅试用
  284. job := ""
  285. user_msg, user_msg_ok := Mgo.FindOneByField("user_msg", map[string]interface{}{
  286. "s_userId": _id,
  287. }, map[string]interface{}{
  288. "s_company": 1,
  289. "s_job": 1,
  290. "s_name": 1,
  291. "s_phone": 1,
  292. })
  293. if user_msg_ok && user_msg != nil {
  294. if company == "" {
  295. company = strings.TrimSpace(ObjToString((*user_msg)["s_company"]))
  296. }
  297. if name == "" {
  298. name = ObjToString((*user_msg)["s_name"])
  299. }
  300. job = ObjToString((*user_msg)["s_job"])
  301. if user_msg_phone := strings.TrimSpace(ObjToString((*user_msg)["s_phone"])); user_msg_phone != "" && !phone_map[user_msg_phone] {
  302. phone_map[user_msg_phone] = true
  303. phone_array = append(phone_array, user_msg_phone)
  304. }
  305. }
  306. //开通大会员
  307. if company == "" {
  308. member, member_ok := Mgo.FindOneByField("member", map[string]interface{}{
  309. "userid": _id,
  310. }, map[string]interface{}{
  311. "entname": 1,
  312. })
  313. if member_ok && member != nil {
  314. company = strings.TrimSpace(ObjToString((*member)["entname"]))
  315. }
  316. }
  317. //大会员试用
  318. saleLeads, saleLeads_ok := Mgo.FindOneByField("saleLeads", map[string]interface{}{
  319. "userid": _id,
  320. }, map[string]interface{}{
  321. "company": 1,
  322. "position": 1,
  323. "name": 1,
  324. "phone": 1,
  325. })
  326. if saleLeads_ok && saleLeads != nil {
  327. if company == "" {
  328. company = strings.TrimSpace(ObjToString((*saleLeads)["company"]))
  329. }
  330. if name == "" {
  331. name = ObjToString((*saleLeads)["name"])
  332. }
  333. if job == "" {
  334. job = ObjToString((*saleLeads)["position"])
  335. }
  336. if saleLeads_phone := strings.TrimSpace(ObjToString((*saleLeads)["phone"])); saleLeads_phone != "" && !phone_map[saleLeads_phone] {
  337. phone_map[saleLeads_phone] = true
  338. phone_array = append(phone_array, saleLeads_phone)
  339. }
  340. }
  341. //以前申请打开微信推送模板消息,目前入口已关闭
  342. if flag {
  343. applysub_user, applysub_user_ok := Mgo.FindOneByField("applysub_user", map[string]interface{}{
  344. "s_openid": s_m_openid,
  345. }, map[string]interface{}{
  346. "s_company": 1,
  347. "s_phone": 1,
  348. })
  349. if applysub_user_ok && applysub_user != nil {
  350. if company == "" {
  351. company = strings.TrimSpace(ObjToString((*applysub_user)["s_company"]))
  352. }
  353. if applysub_user_phone := strings.TrimSpace(ObjToString((*applysub_user)["s_phone"])); applysub_user_phone != "" && !phone_map[applysub_user_phone] {
  354. phone_map[applysub_user_phone] = true
  355. phone_array = append(phone_array, applysub_user_phone)
  356. }
  357. }
  358. }
  359. //绑定的手机号要放数组第一个,省份、城市从手机号归属地中取
  360. if len(phone_array) > 0 {
  361. phoneData, err := phonedata.Find(phone_array[0])
  362. if err == nil {
  363. province = phoneData.Province
  364. city = phoneData.City
  365. }
  366. }
  367. //关注状态
  368. follow_status := 1
  369. if s_m_openid != "" && IntAllDef(m["i_ispush"], 1) == 0 {
  370. follow_status = 2
  371. }
  372. //推荐人
  373. channel_id := ""
  374. if flag {
  375. person_invitelink, person_invitelink_ok := Mgo.FindOneByField("person_invitelink", map[string]interface{}{
  376. "s_target_openid": s_m_openid,
  377. "$and": []map[string]interface{}{
  378. map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}},
  379. map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}},
  380. map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}},
  381. },
  382. }, map[string]interface{}{
  383. "s_source_openid": 1,
  384. })
  385. if person_invitelink_ok && person_invitelink != nil {
  386. channel_id = ObjToString((*person_invitelink)["s_source_openid"])
  387. }
  388. if channel_id == "" {
  389. person_activelink, person_activelink_ok := Mgo.FindOneByField("person_activelink", map[string]interface{}{
  390. "s_target_openid": s_m_openid,
  391. "$and": []map[string]interface{}{
  392. map[string]interface{}{"s_source_openid": map[string]interface{}{"$exists": 1}},
  393. map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": ""}},
  394. map[string]interface{}{"s_source_openid": map[string]interface{}{"$ne": s_m_openid}},
  395. },
  396. }, map[string]interface{}{
  397. "s_source_openid": 1,
  398. })
  399. if person_activelink_ok && person_activelink != nil {
  400. channel_id = ObjToString((*person_activelink)["s_source_openid"])
  401. }
  402. }
  403. }
  404. return &raw_user{
  405. User_id: _id,
  406. Open_id: s_m_openid,
  407. Reg_time: FormatDateByInt64(&registedate, Date_Full_Layout),
  408. Province: province,
  409. City: city,
  410. Reg_type: strings.Join(reg_type, "、"),
  411. Device: ObjToString(m["s_appponetype"]),
  412. Company: company,
  413. Job: job,
  414. Source_module: ObjToString(m["s_module"]),
  415. Source_channel: ObjToString(m["s_rsource"]),
  416. Phone: strings.Join(phone_array, ","),
  417. Channel_id: channel_id,
  418. Email: email,
  419. Timestamp: NowFormat(Date_Full_Layout),
  420. Name: name,
  421. Follow_status: follow_status,
  422. }
  423. }