messageService.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. package common
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/type/message"
  5. "app.yhyue.com/moapp/MessageCenter/util"
  6. qutil "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/redis"
  8. "context"
  9. "errors"
  10. "fmt"
  11. "log"
  12. "strings"
  13. "time"
  14. )
  15. type MessageService struct{}
  16. // 修改消息阅读状态
  17. func (service *MessageService) ChangeReadStatus(data *message.ChangeReadStatusReq) error {
  18. row := entity.ClickhouseConn.QueryRow(context.Background(), fmt.Sprintf("SELECT count(*) from message_user_summary WHERE userId = '%s' ANd bitmapContains(readMsg,%d)", data.UserId, data.Id))
  19. var count uint64
  20. row.Scan(&count)
  21. if count > 0 {
  22. return nil
  23. }
  24. msg := entity.Mysql.FindOne("message_send_log", map[string]interface{}{"id": data.Id}, "group_id", "")
  25. if msg != nil && len(*msg) > 0 {
  26. err := PutRedisRead(data.UserId, int(data.Id))
  27. if err != nil {
  28. return err
  29. }
  30. groupId := qutil.IntAll((*msg)["group_id"])
  31. //更新用户未读消息bitmap
  32. sql := fmt.Sprintf(`alter table message_user_summary UPDATE readMsg = bitmapOr(readMsg,bitmapBuild([toUInt64(%d)])) where userId = '%s'`, data.Id, data.UserId)
  33. fmt.Println(sql)
  34. err1 := entity.ClickhouseConn.Exec(context.Background(), sql)
  35. if err1 != nil {
  36. return err1
  37. }
  38. //清缓存
  39. keyString := fmt.Sprintf(MsgCountKey, data.UserId, groupId)
  40. if redis.GetInt(redisModule, keyString) > 0 {
  41. redis.Decrby(redisModule, keyString, 1)
  42. }
  43. if groupId == 5 || groupId == 11 {
  44. redis.Del(redisModule, fmt.Sprintf(UserWorkDeskKey, data.UserId))
  45. }
  46. redis.Del(redisModule, fmt.Sprintf(UserMsgSummery, data.UserId))
  47. redis.Del(redisModule, fmt.Sprintf(UserClassMapKey, data.UserId))
  48. } else {
  49. return errors.New(fmt.Sprintf("消息不存在:%d", data.Id))
  50. }
  51. return nil
  52. }
  53. // 未读消息合计 isRedis 是否需要初始化redis
  54. func (service *MessageService) CountUnread(userId string, isRedis bool) (map[string]int64, int64) {
  55. var (
  56. count int64
  57. msgTypes, groupIds []string
  58. )
  59. data := make(map[string]int64)
  60. for _, v := range entity.MessageColumn {
  61. if util.IntAll(v["group_id"]) > 0 && util.IntAll(v["group_id"]) < 999 {
  62. //去除全部与私信
  63. msgTypes = append(msgTypes, fmt.Sprintf(`"%s"`, qutil.InterfaceToStr(v["group_id"])))
  64. key := fmt.Sprintf(MsgCountKey, userId, util.IntAll(v["group_id"]))
  65. groupIds = append(groupIds, qutil.InterfaceToStr(v["group_id"]))
  66. if exists, _ := redis.Exists(redisModule, key); exists {
  67. ct := util.Int64All(redis.GetInt(redisModule, key))
  68. data[qutil.InterfaceToStr(v["group_id"])] = ct
  69. count += ct
  70. }
  71. }
  72. }
  73. if len(msgTypes) > 0 && len(msgTypes) != len(data) {
  74. count = 0
  75. query := entity.Mysql.SelectBySql(fmt.Sprintf("SELECT group_id,COUNT(CASE WHEN isRead=0 THEN 1 END) as count FROM message where receive_userid=? and isdel=1 and group_id IS NOT NULL GROUP BY group_id ORDER BY FIELD(`group_id`,%s)", strings.Join(msgTypes, ",")), userId)
  76. if query != nil && len(*query) > 0 {
  77. for _, v := range *query {
  78. unread := qutil.Int64All(v["count"])
  79. data[qutil.InterfaceToStr(v["group_id"])] = unread
  80. count += unread
  81. }
  82. }
  83. if isRedis { //初始化未读数
  84. for _, v1 := range groupIds {
  85. key := fmt.Sprintf(MsgCountKey, userId, qutil.IntAll(v1))
  86. redis.Put(redisModule, key, data[v1], -1)
  87. }
  88. }
  89. }
  90. return data, count
  91. }
  92. func (service *MessageService) CountClassUnread(userId string, groupId int64) (classCount map[string]int64, total int64) {
  93. var (
  94. count int64
  95. )
  96. data := make(map[string]int64)
  97. if _, ok := entity.ClassSearchMap[groupId]; !ok {
  98. return
  99. }
  100. for i := 0; i < len(entity.ClassSearchMap[groupId]); i++ {
  101. msgClass := entity.ClassSearchMap[groupId][i]
  102. key := fmt.Sprintf(MsgClassCountKey, userId, msgClass.MsgType)
  103. if exists, _ := redis.Exists(redisModule, key); exists {
  104. ct := util.Int64All(redis.GetInt(redisModule, key))
  105. data[fmt.Sprintf("%d", msgClass.MsgType)] = ct
  106. count += ct
  107. } else {
  108. q := "select count(*) from message where receive_userid=? and isdel=1 and msg_type=?"
  109. classCount := entity.Mysql.CountBySql(q, userId, msgClass.MsgType)
  110. if classCount != -1 {
  111. redis.Put(redisModule, key, classCount, -1)
  112. data[fmt.Sprintf("%d", msgClass.MsgType)] = classCount
  113. count += classCount
  114. } else {
  115. log.Println("查询classCount失败:", classCount, q)
  116. }
  117. }
  118. }
  119. return data, count
  120. }
  121. // 查询消息详情
  122. func FindMessageDetail(id, msgLogId int64, userId string) (msg *map[string]interface{}, err error) {
  123. //直接查询message_send_log
  124. msg = entity.Mysql.FindOne("message_send_log", map[string]interface{}{"id": msgLogId}, "", "")
  125. if msg != nil && len(*msg) > 0 {
  126. return msg, nil
  127. }
  128. return nil, errors.New("没有查询到消息")
  129. }
  130. // GetMsgType 消息的分类
  131. /*func (service *MessageService) GetMsgType() (data []*message.MsgTypes, err error) {
  132. types := entity.Mysql.SelectBySql("SELECT * FROM `message_group` WHERE group_id > 0 ORDER BY sequence ASC")
  133. if types != nil && len(*types) > 0 {
  134. for _, val := range *types {
  135. data = append(data, &message.MsgTypes{
  136. MsgType: qutil.Int64All(val["group_id"]),
  137. Name: qutil.ObjToString(val["name"]),
  138. Img: qutil.ObjToString(val["img"]),
  139. Code: qutil.ObjToString(val["switch"]),
  140. DisplayPlatform: qutil.ObjToString(val["display_platform"]),
  141. })
  142. }
  143. return data, nil
  144. }
  145. return nil, nil
  146. }*/
  147. func (service *MessageService) MsgOpenLog(platFrom, msgLogId int64, userId string) error {
  148. //判断用户是否已经在pc端打开过
  149. sql := fmt.Sprintf("SELECT COUNT(*) FROM message_open_log WHERE msg_log_id = %d and platform = %d and userid = '%s'", msgLogId, platFrom, userId)
  150. row := entity.ClickhouseConn.QueryRow(context.Background(), sql)
  151. var count uint64
  152. row.Scan(&count)
  153. if count <= 0 {
  154. tmp := map[string]interface{}{
  155. "msg_log_id": msgLogId,
  156. "platform": platFrom,
  157. "userid": userId,
  158. "createtime": time.Now().Format("2006-01-02 15:04:05"),
  159. }
  160. SaveCache <- tmp
  161. }
  162. return nil
  163. }