subscribeAll.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package main
  2. import (
  3. "github.com/gogf/gf/v2/util/gconv"
  4. "strings"
  5. "time"
  6. "log"
  7. "app.yhyue.com/moapp/jybase/common"
  8. "app.yhyue.com/moapp/jybase/mongodb"
  9. )
  10. func subscribeAllSync() {
  11. log.Println("订阅全量数据任务开始")
  12. session := Mgo.GetMgoConn()
  13. defer func() {
  14. Mgo.DestoryMongoConn(session)
  15. }()
  16. query := map[string]interface{}{}
  17. iter := session.DB(db.Mgo.DbName).C("user").Find(&query).Sort("_id").Iter()
  18. thisData := map[string]interface{}{}
  19. for {
  20. if !iter.Next(&thisData) {
  21. break
  22. }
  23. FormatSubscribeAllData(thisData)
  24. }
  25. log.Println("订阅全量数据任务结束")
  26. }
  27. func FormatSubscribeAllData(data map[string]interface{}) {
  28. stype, types := "", ""
  29. userId := mongodb.BsonIdToSId(data["_id"])
  30. areaCodes := ""
  31. keywords := ""
  32. keyArrs := []string{}
  33. if data["o_member_jy"] != nil {
  34. if gconv.Int64(data["i_member_status"]) > 0 {
  35. stype = "大会员订阅"
  36. types = "o_member_jy"
  37. }
  38. }
  39. if stype == "" {
  40. if data["o_vipjy"] != nil {
  41. stype = "超级订阅"
  42. types = "o_vipjy"
  43. } else {
  44. if data["o_jy"] != nil {
  45. stype = "免费订阅"
  46. types = "o_jy"
  47. }
  48. }
  49. }
  50. if types != "" {
  51. if jy, ok := data[types].(map[string]interface{}); ok {
  52. if types == "o_jy" {
  53. //
  54. if area, oks := jy["o_area"].(map[string]interface{}); oks {
  55. areaArr := []string{}
  56. for k, _ := range area {
  57. areaArr = append(areaArr, AreaCode[k])
  58. }
  59. if len(area) == 0 {
  60. areaArr = append(areaArr, AreaCode["全国"])
  61. }
  62. areaCodes = strings.Join(areaArr, ",")
  63. }
  64. //
  65. akey, aok := jy["a_key"].([]map[string]interface{})
  66. if !aok {
  67. akeys, _ := jy["a_key"].([]interface{})
  68. akey = common.ObjArrToMapArr(akeys)
  69. }
  70. for _, v := range akey {
  71. keysArr, asok := v["key"].([]string)
  72. if !asok {
  73. keysArr_s, _ := v["key"].([]interface{})
  74. keysArr = common.ObjArrToStringArr(keysArr_s)
  75. }
  76. for _, key := range keysArr {
  77. keyArrs = append(keyArrs, key)
  78. }
  79. }
  80. } else if types == "o_vipjy" {
  81. if area, oks := jy["area"].(map[string]interface{}); oks {
  82. areaArr := []string{}
  83. for k, _ := range area {
  84. areaArr = append(areaArr, AreaCode[k])
  85. }
  86. if len(area) == 0 {
  87. areaArr = append(areaArr, AreaCode["全国"])
  88. }
  89. areaCodes = strings.Join(areaArr, ",")
  90. }
  91. items, aok := jy["items"].([]map[string]interface{})
  92. if !aok {
  93. itemss, _ := jy["items"].([]interface{})
  94. items = common.ObjArrToMapArr(itemss)
  95. }
  96. for _, v := range items {
  97. akey, iok := v["a_key"].([]map[string]interface{})
  98. if !iok {
  99. akeys, _ := v["a_key"].([]interface{})
  100. akey = common.ObjArrToMapArr(akeys)
  101. }
  102. for _, v := range akey {
  103. keysArr, asok := v["key"].([]string)
  104. if !asok {
  105. keysArr_s, _ := v["key"].([]interface{})
  106. keysArr = common.ObjArrToStringArr(keysArr_s)
  107. }
  108. for _, key := range keysArr {
  109. keyArrs = append(keyArrs, key)
  110. }
  111. }
  112. }
  113. } else {
  114. if area, oks := jy["o_area"].(map[string]interface{}); oks {
  115. areaArr := []string{}
  116. for k, _ := range area {
  117. areaArr = append(areaArr, AreaCode[k])
  118. }
  119. if len(area) == 0 {
  120. areaArr = append(areaArr, AreaCode["全国"])
  121. }
  122. areaCodes = strings.Join(areaArr, ",")
  123. }
  124. if jy["o_area"] == nil {
  125. areaCodes = AreaCode["全国"]
  126. }
  127. items, aok := jy["a_items"].([]map[string]interface{})
  128. if !aok {
  129. itemss, _ := jy["a_items"].([]interface{})
  130. items = common.ObjArrToMapArr(itemss)
  131. }
  132. for _, v := range items {
  133. akey, iok := v["a_key"].([]map[string]interface{})
  134. if !iok {
  135. akeys, _ := v["a_key"].([]interface{})
  136. akey = common.ObjArrToMapArr(akeys)
  137. }
  138. for _, v := range akey {
  139. keysArr, asok := v["key"].([]string)
  140. if !asok {
  141. keysArr_s, _ := v["key"].([]interface{})
  142. keysArr = common.ObjArrToStringArr(keysArr_s)
  143. }
  144. for _, key := range keysArr {
  145. keyArrs = append(keyArrs, key)
  146. }
  147. }
  148. }
  149. }
  150. if len(keyArrs) > 0 {
  151. keywords = strings.Join(keyArrs, ",")
  152. }
  153. }
  154. start := time.Now().Format("2006-01-02") + " 00:00:00"
  155. end := time.Now().Format("2006-01-02") + " 23:59:59"
  156. nowTime := time.Now().Format("2006-01-02 15:04:05")
  157. subscribeData := TiDb.SelectBySql(`select * from dwd_f_userbase_subscribe_info where userid = "` + userId + `" and updatetime >= "` + start + `" and updatetime <= "` + end + `"`)
  158. if subscribeData != nil && len(*subscribeData) > 0 {
  159. TiDb.Update("dwd_f_userbase_subscribe_info", map[string]interface{}{"id": common.IntAll((*subscribeData)[0]["id"])}, map[string]interface{}{
  160. "updatetime": nowTime,
  161. "subscribe_areas": areaCodes,
  162. "subscribe_keywords": keywords,
  163. "member_type": stype,
  164. "userid": userId,
  165. })
  166. } else {
  167. TiDb.Insert("dwd_f_userbase_subscribe_info", map[string]interface{}{
  168. "userid": userId,
  169. "updatetime": nowTime,
  170. "subscribe_areas": areaCodes,
  171. "subscribe_keywords": keywords,
  172. "member_type": stype,
  173. })
  174. }
  175. }
  176. }