subscribeAll.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package main
  2. import (
  3. "strings"
  4. "time"
  5. "log"
  6. "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/mongodb"
  8. )
  9. func subscribeAllSync() {
  10. log.Println("订阅全量数据任务开始")
  11. session := Mgo.GetMgoConn()
  12. defer func() {
  13. Mgo.DestoryMongoConn(session)
  14. }()
  15. query := map[string]interface{}{}
  16. iter := session.DB(db.Mgo.DbName).C("user").Find(&query).Sort("_id").Iter()
  17. thisData := map[string]interface{}{}
  18. for {
  19. if !iter.Next(&thisData) {
  20. break
  21. }
  22. FormatSubscribeAllData(thisData)
  23. }
  24. log.Println("订阅全量数据任务结束")
  25. }
  26. func FormatSubscribeAllData(data map[string]interface{}) {
  27. stype, types := "", ""
  28. userId := mongodb.BsonIdToSId(data["_id"])
  29. areaCodes := ""
  30. keywords := ""
  31. keyArrs := []string{}
  32. if data["o_member_jy"] != nil {
  33. stype = "大会员订阅"
  34. types = "o_member_jy"
  35. } else {
  36. if data["o_vipjy"] != nil {
  37. stype = "超级订阅"
  38. types = "o_vipjy"
  39. } else {
  40. if data["o_jy"] != nil {
  41. stype = "免费订阅"
  42. types = "o_jy"
  43. }
  44. }
  45. }
  46. if types != "" {
  47. if jy, ok := data[types].(map[string]interface{}); ok {
  48. if types == "o_jy" {
  49. //
  50. if area, oks := jy["o_area"].(map[string]interface{}); oks {
  51. areaArr := []string{}
  52. for k, _ := range area {
  53. areaArr = append(areaArr, AreaCode[k])
  54. }
  55. if len(area) == 0 {
  56. areaArr = append(areaArr, AreaCode["全国"])
  57. }
  58. areaCodes = strings.Join(areaArr, ",")
  59. }
  60. //
  61. akey, aok := jy["a_key"].([]map[string]interface{})
  62. if !aok {
  63. akeys, _ := jy["a_key"].([]interface{})
  64. akey = common.ObjArrToMapArr(akeys)
  65. }
  66. for _, v := range akey {
  67. keysArr, asok := v["key"].([]string)
  68. if !asok {
  69. keysArr_s, _ := v["key"].([]interface{})
  70. keysArr = common.ObjArrToStringArr(keysArr_s)
  71. }
  72. for _, key := range keysArr {
  73. keyArrs = append(keyArrs, key)
  74. }
  75. }
  76. } else if types == "o_vipjy" {
  77. if area, oks := jy["area"].(map[string]interface{}); oks {
  78. areaArr := []string{}
  79. for k, _ := range area {
  80. areaArr = append(areaArr, AreaCode[k])
  81. }
  82. if len(area) == 0 {
  83. areaArr = append(areaArr, AreaCode["全国"])
  84. }
  85. areaCodes = strings.Join(areaArr, ",")
  86. }
  87. items, aok := jy["items"].([]map[string]interface{})
  88. if !aok {
  89. itemss, _ := jy["items"].([]interface{})
  90. items = common.ObjArrToMapArr(itemss)
  91. }
  92. for _, v := range items {
  93. akey, iok := v["a_key"].([]map[string]interface{})
  94. if !iok {
  95. akeys, _ := v["a_key"].([]interface{})
  96. akey = common.ObjArrToMapArr(akeys)
  97. }
  98. for _, v := range akey {
  99. keysArr, asok := v["key"].([]string)
  100. if !asok {
  101. keysArr_s, _ := v["key"].([]interface{})
  102. keysArr = common.ObjArrToStringArr(keysArr_s)
  103. }
  104. for _, key := range keysArr {
  105. keyArrs = append(keyArrs, key)
  106. }
  107. }
  108. }
  109. } else {
  110. if area, oks := jy["o_area"].(map[string]interface{}); oks {
  111. areaArr := []string{}
  112. for k, _ := range area {
  113. areaArr = append(areaArr, AreaCode[k])
  114. }
  115. if len(area) == 0 {
  116. areaArr = append(areaArr, AreaCode["全国"])
  117. }
  118. areaCodes = strings.Join(areaArr, ",")
  119. }
  120. if jy["o_area"] == nil {
  121. areaCodes = AreaCode["全国"]
  122. }
  123. items, aok := jy["a_items"].([]map[string]interface{})
  124. if !aok {
  125. itemss, _ := jy["a_items"].([]interface{})
  126. items = common.ObjArrToMapArr(itemss)
  127. }
  128. for _, v := range items {
  129. akey, iok := v["a_key"].([]map[string]interface{})
  130. if !iok {
  131. akeys, _ := v["a_key"].([]interface{})
  132. akey = common.ObjArrToMapArr(akeys)
  133. }
  134. for _, v := range akey {
  135. keysArr, asok := v["key"].([]string)
  136. if !asok {
  137. keysArr_s, _ := v["key"].([]interface{})
  138. keysArr = common.ObjArrToStringArr(keysArr_s)
  139. }
  140. for _, key := range keysArr {
  141. keyArrs = append(keyArrs, key)
  142. }
  143. }
  144. }
  145. }
  146. if len(keyArrs) > 0 {
  147. keywords = strings.Join(keyArrs, ",")
  148. }
  149. }
  150. start := time.Now().Format("2006-01-02") + " 00:00:00"
  151. end := time.Now().Format("2006-01-02") + " 23:59:59"
  152. nowTime := time.Now().Format("2006-01-02 15:04:05")
  153. subscribeData := TiDb.SelectBySql(`select * from dwd_f_userbase_subscribe_info where userid = "` + userId + `" and updatetime >= "` + start + `" and updatetime <= "` + end + `"`)
  154. if subscribeData != nil && len(*subscribeData) > 0 {
  155. TiDb.Update("dwd_f_userbase_subscribe_info", map[string]interface{}{"id": common.IntAll((*subscribeData)[0]["id"])}, map[string]interface{}{
  156. "updatetime": nowTime,
  157. "subscribe_areas": areaCodes,
  158. "subscribe_keywords": keywords,
  159. "member_type": stype,
  160. "userid": userId,
  161. })
  162. } else {
  163. TiDb.Insert("dwd_f_userbase_subscribe_info", map[string]interface{}{
  164. "userid": userId,
  165. "updatetime": nowTime,
  166. "subscribe_areas": areaCodes,
  167. "subscribe_keywords": keywords,
  168. "member_type": stype,
  169. })
  170. }
  171. }
  172. }