task.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. package common
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
  5. "app.yhyue.com/moapp/jybase/common"
  6. "app.yhyue.com/moapp/jybase/encrypt"
  7. "app.yhyue.com/moapp/jybase/redis"
  8. "context"
  9. "fmt"
  10. "github.com/robfig/cron/v3"
  11. "log"
  12. "strings"
  13. "time"
  14. )
  15. var GlobMsgMap map[int]map[string]interface{}
  16. func LoadTask() {
  17. // 每隔10分钟执行一次
  18. StatisticalUser(true)
  19. c := cron.New(cron.WithSeconds())
  20. c.AddFunc(config.ConfigJson.GlobMsgLoadTime, LoadMsgOnTime)
  21. c.AddFunc(config.ConfigJson.FreeIntelTime, FreeIntelUserPush) //免费用户推送
  22. c.AddFunc(config.ConfigJson.PayIntelTime, PayIntelUserPush) //付费用户推送
  23. go c.Start()
  24. defer c.Stop()
  25. }
  26. func LoadMsgOnTime() {
  27. fmt.Println("开始执行")
  28. msgMap := make(map[int]map[string]interface{})
  29. m := entity.Mysql.SelectBySql("SELECT id,msg_type,title,content,send_time,menu_name,link,group_id,sign FROM message_send_log WHERE send_status = 4 AND isdel = 1 ORDER BY send_time DESC limit ?", config.ConfigJson.MsgLogLimit)
  30. if m != nil && len(*m) > 0 {
  31. for _, val := range *m {
  32. msgMap[common.IntAll(val["id"])] = val
  33. }
  34. GlobMsgMap = msgMap
  35. }
  36. fmt.Println("GlobMsgMap len", len(GlobMsgMap))
  37. }
  38. func FreeIntelUserPush() {
  39. ids := FreeMessageData(config.ConfigJson.FreePushNumber)
  40. if len(ids) == 0 {
  41. return
  42. }
  43. users := StatisticalUser(true)
  44. PushData(users, ids, nil)
  45. }
  46. func PayIntelUserPush() {
  47. data := messageData(config.ConfigJson.PayPushNumber)
  48. if data == nil || len(*data) == 0 {
  49. return
  50. }
  51. users := StatisticalUser(false)
  52. PushData(users, nil, data)
  53. }
  54. func PushData(users []string, ids []int64, data *[]map[string]interface{}) {
  55. log.Printf("需推送用户:%d\n", len(users))
  56. switch len(ids) {
  57. case 0: //付费用户
  58. if data != nil {
  59. for _, m := range *data {
  60. _id := common.InterfaceToStr(m["_id"])
  61. var link []string
  62. link = append(link, fmt.Sprintf("/swordfish/page_big_pc/business_detail/%s", encrypt.EncodeArticleId2ByCheck(_id)))
  63. mobLink := fmt.Sprintf("/jy_mobile/business/detail/%s", encrypt.EncodeArticleId2ByCheck(_id))
  64. link = append(link, mobLink)
  65. link = append(link, mobLink)
  66. link = append(link, mobLink)
  67. var title = "您有一条专属商机情报"
  68. if strings.Contains(config.ConfigJson.MsgTitlePS, "%s") {
  69. title = fmt.Sprintf(config.ConfigJson.MsgTitlePS, common.InterfaceToStr(m["buyer"]))
  70. }
  71. iData := map[string]interface{}{
  72. "msg_type": 9,
  73. "title": title,
  74. "content": fmt.Sprintf("【商机情报】%s", common.ObjToString(m["title"])),
  75. "send_mode": 1,
  76. "send_time": time.Now().Format("2006-01-02 15:04:05"),
  77. "send_status": 4,
  78. "update_time": time.Now().Format("2006-01-02 15:04:05"),
  79. "createtime": time.Now().Format("2006-01-02 15:04:05"),
  80. "link": strings.Join(link, ","),
  81. "isdel": 1,
  82. "send_userid": "商机情报定时推送",
  83. "sign": 0,
  84. "group_id": 5,
  85. }
  86. logId := entity.Mysql.Insert("message_send_log", iData)
  87. ids = append(ids, logId)
  88. }
  89. }
  90. if len(ids) > 0 {
  91. var bits []string
  92. for _, i2 := range ids {
  93. bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
  94. }
  95. biyStr := strings.Join(bits, ",")
  96. err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([%s])) where group_id = %d`, biyStr, 5))
  97. if err != nil {
  98. log.Println("message_summary err=== ", err.Error())
  99. return
  100. }
  101. var userIds []string
  102. for k, user := range users {
  103. userIds = append(userIds, user)
  104. if len(userIds) == 1000 || k == len(users)-1 {
  105. UpdateBatch(userIds, biyStr)
  106. userIds = []string{}
  107. }
  108. }
  109. }
  110. default: //免费用户
  111. var bits []string
  112. for _, i2 := range ids {
  113. bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
  114. }
  115. log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where 1=1`, strings.Join(bits, ",")))
  116. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where 1=1`, strings.Join(bits, ",")))
  117. if err1 != nil {
  118. log.Printf("批量更新message_user_summary出错:%s", err1)
  119. return
  120. }
  121. for _, userid := range users {
  122. keyString := fmt.Sprintf(MsgCountKey, userid, 5)
  123. redis.Del(redisModule, keyString)
  124. }
  125. }
  126. log.Println("PushData完成")
  127. }
  128. // 用户
  129. func StatisticalUser(isFree bool) []string {
  130. var ids []string
  131. mUser := make(map[string]bool) //商机管理用户
  132. data := entity.Mysql.SelectBySql(`SELECT a.phone as phone FROM entniche_user a INNER JOIN entniche_info b on b.status = 1 and a.power = 1 and a.ent_id = b.id and a.phone != ''`)
  133. if data != nil && len(*data) > 0 { //统计商机管理用户
  134. for _, m := range *data {
  135. if common.InterfaceToStr(m["phone"]) != "" {
  136. mUser[common.InterfaceToStr(m["phone"])] = true
  137. }
  138. }
  139. }
  140. log.Printf("获取商机管理用户:%d\n", len(mUser))
  141. //企业大会员或超级订阅
  142. entPayUser, ok := entity.MQFW.Find("ent_user", map[string]interface{}{
  143. "$or": []map[string]interface{}{
  144. { //个人订阅
  145. "i_vip_status": map[string]interface{}{
  146. "$gt": 0,
  147. },
  148. },
  149. { //个人大会员
  150. "i_member_status": map[string]interface{}{
  151. "$gt": 0,
  152. },
  153. },
  154. },
  155. }, nil, `{"i_userid":1}`, false, -1, -1)
  156. if ok && entPayUser != nil && len(*entPayUser) > 0 {
  157. var userIds []string
  158. for k, m := range *entPayUser {
  159. userIds = append(userIds, common.InterfaceToStr(m["i_userid"]))
  160. if len(userIds) == 100 || k == len(*entPayUser)-1 {
  161. idSql := fmt.Sprintf(`SELECT phone FROM entniche_user WHERE id in (%s)`, strings.Join(userIds, `,`))
  162. phoneArr := entity.Mysql.SelectBySql(idSql)
  163. if phoneArr != nil && len(*phoneArr) > 0 {
  164. for _, m2 := range *phoneArr {
  165. if common.InterfaceToStr(m2["phone"]) != "" {
  166. mUser[common.InterfaceToStr(m2["phone"])] = true
  167. }
  168. }
  169. }
  170. userIds = []string{}
  171. }
  172. }
  173. }
  174. log.Printf("获取商机管理+企业付费用户:%d\n", len(mUser))
  175. switch isFree {
  176. case false: //付费用户
  177. payMap := make(map[string]bool)
  178. payUsers, _ := entity.MQFW.Find("user", map[string]interface{}{
  179. "$or": []map[string]interface{}{
  180. { //个人订阅
  181. "i_vip_status": map[string]interface{}{
  182. "$gt": 0,
  183. },
  184. },
  185. { //个人大会员
  186. "i_member_status": map[string]interface{}{
  187. "$gt": 0,
  188. },
  189. },
  190. },
  191. }, nil, `{"_id":1,"s_phone":1,"s_m_phone":1}`, false, -1, -1)
  192. if payUsers != nil && len(*payUsers) > 0 {
  193. for _, m := range *payUsers {
  194. phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
  195. payMap[common.InterfaceToStr(m["_id"])] = true
  196. if mUser[phone] { //避免重复获取
  197. delete(mUser, phone)
  198. }
  199. }
  200. }
  201. var (
  202. phones []string
  203. count int
  204. )
  205. maxCount := len(mUser)
  206. for phone := range mUser { //获取剩余商机管理与企业付费用户
  207. count++
  208. phones = append(phones, phone)
  209. if len(phones) == 100 || count == maxCount {
  210. entPayUsers, _ := entity.MQFW.Find("user", map[string]interface{}{
  211. "$or": []map[string]interface{}{
  212. {"s_phone": map[string]interface{}{ //企业订阅||大会员||商机管理
  213. "$in": phones,
  214. }},
  215. {"s_m_phone": map[string]interface{}{ //企业订阅||大会员||商机管理
  216. "$in": phones,
  217. }},
  218. },
  219. }, nil, `{"_id":1}`, false, -1, -1)
  220. if entPayUsers != nil && len(*entPayUsers) > 0 {
  221. for _, m := range *entPayUsers {
  222. payMap[common.InterfaceToStr(m["_id"])] = true
  223. }
  224. }
  225. phones = []string{}
  226. }
  227. }
  228. for id := range payMap {
  229. ids = append(ids, id)
  230. }
  231. case true:
  232. sess := entity.MQFW.GetMgoConn()
  233. defer entity.MQFW.DestoryMongoConn(sess)
  234. iter := sess.DB("qfw").C("user").Find(map[string]interface{}{
  235. "i_appid": 2,
  236. }).Select(map[string]interface{}{"_id": 1, "i_vip_status": 1, "i_member_status": 1, "s_phone": 1, "s_m_phone": 1}).Iter()
  237. for m := make(map[string]interface{}); iter.Next(&m); {
  238. if common.IntAll(m["i_vip_status"]) <= 0 && common.IntAll(m["i_member_status"]) <= 0 {
  239. phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
  240. if !mUser[phone] {
  241. ids = append(ids, common.InterfaceToStr(m["_id"]))
  242. }
  243. }
  244. m = map[string]interface{}{}
  245. }
  246. }
  247. log.Printf("用户类型:%v,用户数:%d", isFree, len(ids))
  248. return ids
  249. }
  250. // 获取推送消息
  251. func messageData(number int) *[]map[string]interface{} {
  252. now := time.Now().AddDate(0, 0, -1)
  253. starttime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
  254. enttime := time.Date(now.Year(), now.Month(), now.Day(), 24, 0, 0, 0, time.Local).Unix()
  255. query := map[string]interface{}{
  256. "yucetime": map[string]interface{}{"$gt": starttime, "$lt": enttime},
  257. }
  258. data, _ := entity.Bidding.Find("project_forecast", query, `{"yucetime": -1}`, `{"title":1,"_id":1,"buyer":1}`, false, 0, number)
  259. return data
  260. }
  261. func FreeMessageData(number int) []int64 {
  262. var ids []int64
  263. now := time.Now()
  264. startTime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).String()
  265. m := entity.Mysql.SelectBySql(fmt.Sprintf(`SELECT id FROM message_send_log WHERE createtime > '%s' and send_userid = '商机情报定时推送' ORDER BY createtime DESC LIMIT %d `, startTime, number))
  266. if m != nil && len(*m) > 0 {
  267. for _, i2 := range *m {
  268. ids = append(ids, common.Int64All(i2["id"]))
  269. }
  270. }
  271. return ids
  272. }
  273. func UpdateBatch(ids []string, bitStr string) {
  274. str := fmt.Sprintf(`'%s'`, strings.Join(ids, `','`))
  275. log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, bitStr, str))
  276. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, bitStr, str))
  277. if err1 != nil {
  278. log.Printf("批量更新message_user_summary出错:%s", err1)
  279. return
  280. }
  281. for _, id := range ids {
  282. keyString := fmt.Sprintf(MsgCountKey, id, 5)
  283. redis.Del(redisModule, keyString)
  284. }
  285. }