subscribe.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package main
  2. import (
  3. "strings"
  4. "time"
  5. "log"
  6. "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/mongodb"
  8. )
  9. func subscribeAddSync() {
  10. log.Println("订阅数据定时任务开始")
  11. session := MgoLog.GetMgoConn()
  12. lastId := cfg.LastSubscribeId
  13. defer func() {
  14. MgoLog.DestoryMongoConn(session)
  15. }()
  16. query := map[string]interface{}{}
  17. if lastId != "" {
  18. query["_id"] = map[string]interface{}{"$gt": mongodb.StringTOBsonId(lastId)}
  19. }
  20. // query["_id"] = mongodb.StringTOBsonId("64473e36c572141d78ec7a03")
  21. log.Println("query :", query)
  22. iter := session.DB(db.MgoLog.DbName).C("ovipjy_log").Find(&query).Sort("_id").Iter()
  23. thisData := map[string]interface{}{}
  24. for {
  25. if !iter.Next(&thisData) {
  26. break
  27. }
  28. cfg.LastSubscribeId = mongodb.BsonIdToSId(thisData["_id"])
  29. FormatSubscribeData(thisData)
  30. }
  31. common.WriteSysConfig(&cfg)
  32. log.Println("订阅数据定时任务结束")
  33. }
  34. func FormatSubscribeData(data map[string]interface{}) {
  35. types, stype := common.ObjToString(data["type"]), ""
  36. // createtime := common.Int64All(data["createtime"])
  37. userId := common.ObjToString(data["userid"])
  38. if !mongodb.IsObjectIdHex(userId) {
  39. userMapping := TiDb.FindOne("data_service.user_system", map[string]interface{}{"position_id": userId}, "", "")
  40. if userMapping != nil && len(*userMapping) > 0 {
  41. userId = common.ObjToString((*userMapping)["userid"])
  42. }
  43. }
  44. areaCodes := ""
  45. keywords := ""
  46. keyArrs := []string{}
  47. if types == "o_member_jy" {
  48. stype = "大会员订阅"
  49. } else if types == "o_vipjy" {
  50. stype = "超级订阅"
  51. } else if types == "o_jy" {
  52. stype = "免费订阅"
  53. }
  54. if types != "" {
  55. if jy, ok := data[types].(map[string]interface{}); ok {
  56. if types == "o_jy" {
  57. //
  58. if area, oks := jy["o_area"].(map[string]interface{}); oks {
  59. areaArr := []string{}
  60. for k, _ := range area {
  61. areaArr = append(areaArr, AreaCode[k])
  62. }
  63. if len(area) == 0 {
  64. areaArr = append(areaArr, AreaCode["全国"])
  65. }
  66. areaCodes = strings.Join(areaArr, ",")
  67. }
  68. //
  69. akey, aok := jy["a_key"].([]map[string]interface{})
  70. if !aok {
  71. akeys, _ := jy["a_key"].([]interface{})
  72. akey = common.ObjArrToMapArr(akeys)
  73. }
  74. for _, v := range akey {
  75. keysArr, asok := v["key"].([]string)
  76. if !asok {
  77. keysArr_s, _ := v["key"].([]interface{})
  78. keysArr = common.ObjArrToStringArr(keysArr_s)
  79. }
  80. for _, key := range keysArr {
  81. keyArrs = append(keyArrs, key)
  82. }
  83. }
  84. } else if types == "o_vipjy" {
  85. if area, oks := jy["area"].(map[string]interface{}); oks {
  86. areaArr := []string{}
  87. for k, _ := range area {
  88. areaArr = append(areaArr, AreaCode[k])
  89. }
  90. if len(area) == 0 {
  91. areaArr = append(areaArr, AreaCode["全国"])
  92. }
  93. areaCodes = strings.Join(areaArr, ",")
  94. }
  95. items, aok := jy["items"].([]map[string]interface{})
  96. if !aok {
  97. itemss, _ := jy["items"].([]interface{})
  98. items = common.ObjArrToMapArr(itemss)
  99. }
  100. for _, v := range items {
  101. akey, iok := v["a_key"].([]map[string]interface{})
  102. if !iok {
  103. akeys, _ := v["a_key"].([]interface{})
  104. akey = common.ObjArrToMapArr(akeys)
  105. }
  106. for _, v := range akey {
  107. keysArr, asok := v["key"].([]string)
  108. if !asok {
  109. keysArr_s, _ := v["key"].([]interface{})
  110. keysArr = common.ObjArrToStringArr(keysArr_s)
  111. }
  112. for _, key := range keysArr {
  113. keyArrs = append(keyArrs, key)
  114. }
  115. }
  116. }
  117. } else {
  118. if area, oks := jy["o_area"].(map[string]interface{}); oks {
  119. areaArr := []string{}
  120. for k, _ := range area {
  121. areaArr = append(areaArr, AreaCode[k])
  122. }
  123. if len(area) == 0 {
  124. areaArr = append(areaArr, AreaCode["全国"])
  125. }
  126. areaCodes = strings.Join(areaArr, ",")
  127. }
  128. if jy["o_area"] == nil {
  129. areaCodes = AreaCode["全国"]
  130. }
  131. items, aok := jy["a_items"].([]map[string]interface{})
  132. if !aok {
  133. itemss, _ := jy["a_items"].([]interface{})
  134. items = common.ObjArrToMapArr(itemss)
  135. }
  136. for _, v := range items {
  137. akey, iok := v["a_key"].([]map[string]interface{})
  138. if !iok {
  139. akeys, _ := v["a_key"].([]interface{})
  140. akey = common.ObjArrToMapArr(akeys)
  141. }
  142. for _, v := range akey {
  143. keysArr, asok := v["key"].([]string)
  144. if !asok {
  145. keysArr_s, _ := v["key"].([]interface{})
  146. keysArr = common.ObjArrToStringArr(keysArr_s)
  147. }
  148. for _, key := range keysArr {
  149. keyArrs = append(keyArrs, key)
  150. }
  151. }
  152. }
  153. }
  154. if len(keyArrs) > 0 {
  155. keywords = strings.Join(keyArrs, ",")
  156. }
  157. }
  158. start := time.Now().Format("2006-01-02") + " 00:00:00"
  159. end := time.Now().Format("2006-01-02") + " 23:59:59"
  160. nowTime := time.Now().Format("2006-01-02 15:04:05")
  161. subscribeData := TiDb.SelectBySql(`select * from dwd_f_userbase_subscribe_info where userid = "` + userId + `" and updatetime >= "` + start + `" and updatetime <= "` + end + `"`)
  162. if subscribeData != nil && len(*subscribeData) > 0 {
  163. TiDb.Update("dwd_f_userbase_subscribe_info", map[string]interface{}{"id": common.IntAll((*subscribeData)[0]["id"])}, map[string]interface{}{
  164. "updatetime": nowTime,
  165. "subscribe_areas": areaCodes,
  166. "subscribe_keywords": keywords,
  167. "member_type": stype,
  168. "userid": userId,
  169. })
  170. } else {
  171. TiDb.Insert("dwd_f_userbase_subscribe_info", map[string]interface{}{
  172. "userid": userId,
  173. "updatetime": nowTime,
  174. "subscribe_areas": areaCodes,
  175. "subscribe_keywords": keywords,
  176. "member_type": stype,
  177. })
  178. }
  179. }
  180. }