task.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package common
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/internal/config"
  5. "app.yhyue.com/moapp/jybase/common"
  6. "app.yhyue.com/moapp/jybase/encrypt"
  7. "context"
  8. "fmt"
  9. "github.com/robfig/cron/v3"
  10. "log"
  11. "strings"
  12. "time"
  13. )
  14. var GlobMsgMap map[int]map[string]interface{}
  15. func LoadTask() {
  16. // 每隔10分钟执行一次
  17. LoadMsgOnTime()
  18. c := cron.New(cron.WithSeconds())
  19. c.AddFunc(config.ConfigJson.GlobMsgLoadTime, LoadMsgOnTime)
  20. c.AddFunc(config.ConfigJson.FreeIntelTime, FreeIntelUserPush) //免费用户推送
  21. c.AddFunc(config.ConfigJson.PayIntelTime, PayIntelUserPush) //付费用户推送
  22. go c.Start()
  23. defer c.Stop()
  24. }
  25. func LoadMsgOnTime() {
  26. fmt.Println("开始执行")
  27. msgMap := make(map[int]map[string]interface{})
  28. m := entity.Mysql.SelectBySql("SELECT id,msg_type,title,content,send_time,menu_name,link,group_id,sign FROM message_send_log WHERE send_status = 4 AND isdel = 1 ORDER BY send_time DESC limit 2000")
  29. if m != nil && len(*m) > 0 {
  30. for _, val := range *m {
  31. msgMap[common.IntAll(val["id"])] = val
  32. }
  33. GlobMsgMap = msgMap
  34. }
  35. fmt.Println("GlobMsgMap len", len(GlobMsgMap))
  36. }
  37. func FreeIntelUserPush() {
  38. data := messageData(config.ConfigJson.FreePushNumber)
  39. if data == nil || len(*data) == 0 {
  40. return
  41. }
  42. users := IntelUser(true)
  43. PushData(users, data)
  44. }
  45. func PayIntelUserPush() {
  46. data := messageData(config.ConfigJson.PayPushNumber)
  47. if data == nil || len(*data) == 0 {
  48. return
  49. }
  50. users := IntelUser(false)
  51. PushData(users, data)
  52. }
  53. func PushData(users []string, data *[]map[string]interface{}) {
  54. log.Printf("需推送用户:%d\n", len(users))
  55. var ids []int64
  56. if data != nil {
  57. for _, m := range *data {
  58. _id := common.InterfaceToStr(m["_id"])
  59. var link []string
  60. link = append(link, fmt.Sprintf("/swordfish/page_big_pc/business_detail/%s", encrypt.EncodeArticleId2ByCheck(_id)))
  61. mobLink := fmt.Sprintf("/jy_mobile/business/detail/%s", encrypt.EncodeArticleId2ByCheck(_id))
  62. link = append(link, mobLink)
  63. link = append(link, mobLink)
  64. link = append(link, mobLink)
  65. iData := map[string]interface{}{
  66. "msg_type": 9,
  67. "title": "您有一条专属商机情报",
  68. "content": fmt.Sprintf("【商机情报】%s", common.ObjToString(m["title"])),
  69. "send_mode": 1,
  70. "send_time": time.Now().Format("2006-01-02 15:04:05"),
  71. "send_status": 4,
  72. "update_time": time.Now().Format("2006-01-02 15:04:05"),
  73. "createtime": time.Now().Format("2006-01-02 15:04:05"),
  74. "link": strings.Join(link, ","),
  75. "isdel": 1,
  76. "send_userid": "商机情报定时推送",
  77. "sign": 0,
  78. "group_id": 5,
  79. }
  80. id := entity.Mysql.Insert("message_send_log", iData)
  81. ids = append(ids, id)
  82. }
  83. }
  84. if len(ids) > 0 {
  85. var bits []string
  86. for _, i2 := range ids {
  87. bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
  88. }
  89. err := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_summary UPDATE msg_bitmap = bitmapOr(msg_bitmap,bitmapBuild([%s])) where group_id = %d`, strings.Join(bits, ","), 5))
  90. if err != nil {
  91. log.Println("message_summary err=== ", err.Error())
  92. return
  93. }
  94. var userIds []string
  95. for k, user := range users {
  96. userIds = append(userIds, user)
  97. if len(userIds) == 1000 || k == len(users)-1 {
  98. UpdateBatch(userIds, ids)
  99. userIds = []string{}
  100. }
  101. }
  102. }
  103. }
  104. // 用户
  105. func IntelUser(isFree bool) []string {
  106. mUser := make(map[string]bool)
  107. data := entity.Mysql.SelectBySql(`SELECT a.phone as phone FROM entniche_user a INNER JOIN entniche_info b on b.status = 1 and a.power = 1 and a.ent_id = b.id and a.phone != ''`)
  108. if data != nil { //统计商机管理用户
  109. for _, m := range *data {
  110. mUser[common.InterfaceToStr(m["phone"])] = true
  111. }
  112. }
  113. var uData []string
  114. switch isFree {
  115. case false:
  116. user, ok := entity.MQFW.Find("user", map[string]interface{}{
  117. "i_appid": 2,
  118. "$or": []map[string]interface{}{
  119. {
  120. "i_vip_status": map[string]interface{}{"$gt": 0},
  121. },
  122. {
  123. "i_member_status": map[string]interface{}{"$gt": 0},
  124. },
  125. },
  126. }, "", `{"_id":1,"s_phone":1,"s_m_phone":1}`, false, -1, -1)
  127. if ok && user != nil && len(*user) > 0 {
  128. for _, m := range *user {
  129. phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
  130. uData = append(uData, common.InterfaceToStr(m["_id"]))
  131. if mUser[phone] { //获取剩余商机管理用户
  132. delete(mUser, phone)
  133. }
  134. }
  135. if len(mUser) > 0 { //统计剩余商机管理用户
  136. var (
  137. phones []string
  138. count int
  139. )
  140. for phone := range mUser {
  141. count++
  142. phones = append(phones, phone)
  143. if len(phones) == 100 || count == len(mUser) {
  144. user1, ok1 := entity.MQFW.Find("user", map[string]interface{}{
  145. "i_appid": 2,
  146. "$or": []map[string]interface{}{
  147. {
  148. "s_phone": map[string]interface{}{
  149. "$in": phones,
  150. },
  151. },
  152. {
  153. "s_m_phone": map[string]interface{}{
  154. "$in": phones,
  155. },
  156. },
  157. },
  158. }, "", `{"_id":1}`, false, -1, -1)
  159. if ok1 && user1 != nil && len(*user1) > 0 {
  160. for _, m := range *user1 {
  161. uData = append(uData, common.InterfaceToStr(m["_id"]))
  162. }
  163. }
  164. phones = []string{}
  165. }
  166. }
  167. }
  168. }
  169. case true:
  170. sess := entity.MQFW.GetMgoConn()
  171. defer entity.MQFW.DestoryMongoConn(sess)
  172. iter := sess.DB("qfw").C("user").Find(map[string]interface{}{
  173. "i_appid": 2,
  174. }).Select(map[string]interface{}{"i_vip_status": 1, "i_member_status": 1, "_id": 1, "s_phone": 1, "s_m_phone": 1}).Iter()
  175. for m := make(map[string]interface{}); iter.Next(&m); {
  176. if common.IntAll(m["i_vip_status"]) <= 0 && common.IntAll(m["i_member_status"]) <= 0 {
  177. phone := common.If(common.InterfaceToStr(m["s_phone"]) == "", common.InterfaceToStr(m["s_m_phone"]), common.InterfaceToStr(m["s_phone"])).(string)
  178. if !mUser[phone] {
  179. uData = append(uData, common.InterfaceToStr(m["_id"]))
  180. }
  181. }
  182. m = map[string]interface{}{}
  183. }
  184. }
  185. return uData
  186. }
  187. // 获取推送消息
  188. func messageData(number int) *[]map[string]interface{} {
  189. now := time.Now().AddDate(0, 0, -1)
  190. starttime := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.Local).Unix()
  191. enttime := time.Date(now.Year(), now.Month(), now.Day(), 24, 0, 0, 0, time.Local).Unix()
  192. query := map[string]interface{}{
  193. "yucetime": map[string]interface{}{"$gt": starttime, "$lt": enttime},
  194. }
  195. data, _ := entity.Bidding.Find("project_forecast", query, `{"yucetime": -1}`, `{"title":1,"_id":1}`, false, 0, number)
  196. return data
  197. }
  198. func UpdateBatch(ids []string, msgLogId []int64) {
  199. str := fmt.Sprintf(`'%s'`, strings.Join(ids, `','`))
  200. var bits []string
  201. for _, i2 := range msgLogId {
  202. bits = append(bits, fmt.Sprintf("toUInt64(%d)", i2))
  203. }
  204. log.Println(fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
  205. err1 := entity.ClickhouseConn.Exec(context.Background(), fmt.Sprintf(`alter table message_user_summary UPDATE allMsg = bitmapOr(allMsg,bitmapBuild([%s])) where userId in (%s)`, strings.Join(bits, ","), str))
  206. if err1 != nil {
  207. log.Printf("批量更新message_user_summary出错:%s", err1)
  208. return
  209. }
  210. }