messageService.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. package common
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/type/message"
  5. "app.yhyue.com/moapp/MessageCenter/util"
  6. qutil "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/redis"
  8. "context"
  9. "errors"
  10. "fmt"
  11. "log"
  12. "strings"
  13. "time"
  14. )
  15. type MessageService struct{}
  16. // 修改消息阅读状态
  17. func (service *MessageService) ChangeReadStatus(data *message.ChangeReadStatusReq) error {
  18. msg := entity.Mysql.FindOne("message_send_log", map[string]interface{}{"id": data.Id}, "group_id", "")
  19. if msg != nil && len(*msg) > 0 {
  20. groupId := qutil.IntAll((*msg)["group_id"])
  21. //更新用户未读消息bitmap
  22. sql := fmt.Sprintf(`alter table message_user_summary UPDATE readMsg = bitmapOr(readMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, groupId, data.UserId)
  23. err1 := entity.ClickhouseConn.Exec(context.Background(), sql)
  24. if err1 != nil {
  25. return err1
  26. }
  27. //清缓存
  28. keyString := fmt.Sprintf(MsgCountKey, data.UserId, groupId)
  29. if redis.GetInt(redisModule, keyString) > 0 {
  30. redis.Decrby(redisModule, keyString, 1)
  31. }
  32. } else {
  33. return errors.New(fmt.Sprintf("消息不存在:%d", data.Id))
  34. }
  35. return nil
  36. }
  37. // 删除消息
  38. func (service *MessageService) DeleteMessage(id []string, appId string) (int64, string) {
  39. orm := entity.Engine.NewSession()
  40. defer orm.Close()
  41. msg := entity.Mysql.FindOne("message", map[string]interface{}{"id": id, "appid": appId}, "", "")
  42. if msg == nil {
  43. return 0, "该消息不存在"
  44. }
  45. m := entity.Message{}
  46. m.Isdel = -1
  47. count, err := orm.Where("appid=?", appId).In("id", id).Cols("isdel").Update(&m)
  48. if err != nil || count == 0 {
  49. log.Println(err)
  50. orm.Rollback()
  51. return 0, "删除消息失败"
  52. }
  53. err2 := orm.Commit()
  54. if err2 != nil {
  55. return 0, "删除消息失败"
  56. }
  57. FindUserMsg(message.FindUserMsgReq{
  58. UserId: qutil.ObjToString((*msg)["receive_userid"]),
  59. Appid: appId,
  60. OffSet: 1,
  61. PageSize: 5,
  62. MsgType: -1,
  63. Read: 0,
  64. }, true)
  65. return 1, "删除消息成功"
  66. }
  67. // 未读消息合计 isRedis 是否需要初始化redis
  68. func (service *MessageService) CountUnread(userId string, isRedis bool) (map[string]int64, int64) {
  69. var (
  70. count int64
  71. msgTypes, groupIds []string
  72. )
  73. data := make(map[string]int64)
  74. for _, v := range entity.MessageColumn {
  75. if util.IntAll(v["group_id"]) > 0 && util.IntAll(v["group_id"]) < 999 {
  76. //去除全部与私信
  77. msgTypes = append(msgTypes, fmt.Sprintf(`"%s"`, qutil.InterfaceToStr(v["group_id"])))
  78. key := fmt.Sprintf(MsgCountKey, userId, util.IntAll(v["group_id"]))
  79. groupIds = append(groupIds, qutil.InterfaceToStr(v["group_id"]))
  80. if exists, _ := redis.Exists(redisModule, key); exists {
  81. ct := util.Int64All(redis.GetInt(redisModule, key))
  82. data[qutil.InterfaceToStr(v["group_id"])] = ct
  83. count += ct
  84. }
  85. }
  86. }
  87. if len(msgTypes) > 0 && len(msgTypes) != len(data) {
  88. count = 0
  89. query := entity.Mysql.SelectBySql(fmt.Sprintf("SELECT group_id,COUNT(CASE WHEN isRead=0 THEN 1 END) as count FROM message where receive_userid=? and isdel=1 and group_id IS NOT NULL GROUP BY group_id ORDER BY FIELD(`group_id`,%s)", strings.Join(msgTypes, ",")), userId)
  90. if query != nil && len(*query) > 0 {
  91. for _, v := range *query {
  92. unread := qutil.Int64All(v["count"])
  93. data[qutil.InterfaceToStr(v["group_id"])] = unread
  94. count += unread
  95. }
  96. }
  97. if isRedis { //初始化未读数
  98. for _, v1 := range groupIds {
  99. key := fmt.Sprintf(MsgCountKey, userId, qutil.IntAll(v1))
  100. redis.Put(redisModule, key, data[v1], -1)
  101. }
  102. }
  103. }
  104. return data, count
  105. }
  106. func (service *MessageService) CountClassUnread(userId string, groupId int64) (classCount map[string]int64, total int64) {
  107. var (
  108. count int64
  109. )
  110. data := make(map[string]int64)
  111. if _, ok := entity.ClassSearchMap[groupId]; !ok {
  112. return
  113. }
  114. for i := 0; i < len(entity.ClassSearchMap[groupId]); i++ {
  115. msgClass := entity.ClassSearchMap[groupId][i]
  116. key := fmt.Sprintf(MsgClassCountKey, userId, msgClass.MsgType)
  117. if exists, _ := redis.Exists(redisModule, key); exists {
  118. ct := util.Int64All(redis.GetInt(redisModule, key))
  119. data[fmt.Sprintf("%d", msgClass.MsgType)] = ct
  120. count += ct
  121. } else {
  122. q := "select count(*) from message where receive_userid=? and isdel=1 and msg_type=?"
  123. classCount := entity.Mysql.CountBySql(q, userId, msgClass.MsgType)
  124. if classCount != -1 {
  125. redis.Put(redisModule, key, classCount, -1)
  126. data[fmt.Sprintf("%d", msgClass.MsgType)] = classCount
  127. count += classCount
  128. } else {
  129. log.Println("查询classCount失败:", classCount, q)
  130. }
  131. }
  132. }
  133. return data, count
  134. }
  135. // 查询消息详情
  136. func FindMessageDetail(id, msgLogId int64, userId string) (msg *map[string]interface{}, err error) {
  137. //直接查询message_send_log
  138. msg = entity.Mysql.FindOne("message_send_log", map[string]interface{}{"id": msgLogId}, "", "")
  139. if msg != nil && len(*msg) > 0 {
  140. return msg, nil
  141. }
  142. return nil, errors.New("没有查询到消息")
  143. }
  144. // GetMsgType 消息的分类
  145. func (service *MessageService) GetMsgType() (data []*message.MsgTypes, err error) {
  146. types := entity.Mysql.SelectBySql("SELECT * FROM `message_group` WHERE group_id > 0 ORDER BY sequence ASC")
  147. if types != nil && len(*types) > 0 {
  148. for _, val := range *types {
  149. data = append(data, &message.MsgTypes{
  150. MsgType: qutil.Int64All(val["group_id"]),
  151. Name: qutil.ObjToString(val["name"]),
  152. Img: qutil.ObjToString(val["img"]),
  153. Code: qutil.ObjToString(val["switch"]),
  154. DisplayPlatform: qutil.ObjToString(val["display_platform"]),
  155. })
  156. }
  157. return data, nil
  158. }
  159. return nil, nil
  160. }
  161. func (service *MessageService) MsgOpenLog(platFrom, msgLogId int64, userId string) int64 {
  162. //判断用户是否已经在pc端打开过
  163. count := entity.Mysql.CountBySql("SELECT COUNT(*) FROM message_open_log WHERE msg_log_id = ? and platform = ? and userid = ?", msgLogId, platFrom, userId)
  164. if count <= 0 {
  165. tmp := map[string]interface{}{
  166. "msg_log_id": msgLogId,
  167. "platform": platFrom,
  168. "userid": userId,
  169. "createtime": time.Now().Format("2006-01-02 15:04:05"),
  170. }
  171. SaveCache <- tmp
  172. }
  173. return 0
  174. }