sendMsg.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package service
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/message"
  5. "app.yhyue.com/moapp/MessageCenter/util"
  6. "app.yhyue.com/moapp/jybase/redis"
  7. "database/sql"
  8. "fmt"
  9. "github.com/tal-tech/go-zero/core/logx"
  10. "log"
  11. "strconv"
  12. "strings"
  13. "time"
  14. )
  15. // 类型的顺序
  16. const order = "1,4"
  17. const MsgCountKey = "count_%s_%s" //redis 消息未读数量 Count.用户id.消息类型=数量
  18. const redisModule = "msgCount"
  19. func SendMsg(this message.SendMsgRequest) (int64, string) {
  20. r, err := entity.Mysql11.Query("select count(*) as c from conversation where receive_id = ? and send_id = ? ", this.ReceiveUserId, this.SendUserId)
  21. c := 0
  22. for r.Next() {
  23. err := r.Scan(&c)
  24. if err != nil {
  25. panic(err.Error())
  26. }
  27. }
  28. logx.Info("查询数量:", c)
  29. sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel)
  30. values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);`
  31. sql3 = fmt.Sprintf(sql3, this.Appid, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"))
  32. if c < 1 {
  33. sql1 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime)
  34. values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  35. sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
  36. ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
  37. //插入会话表
  38. _, err := entity.Mysql11.Exec(sql1)
  39. sql2 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime)
  40. values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  41. sql2 = fmt.Sprintf(sql2, this.Appid, this.ReceiveUserId, this.SendUserId, this.SendName, this.ReceiveUserId, this.ReceiveName, time.Now().Format("2006-01-02 15:04:05"))
  42. _, err = entity.Mysql11.Exec(sql2)
  43. //插入消息表
  44. _, err = entity.Mysql11.Exec(sql3)
  45. if err != nil {
  46. return false
  47. }
  48. return true
  49. })
  50. if ok {
  51. return 1, "消息发送成功"
  52. }
  53. }
  54. _, err = entity.Mysql11.Exec(sql3)
  55. if err == nil {
  56. MsgCountAdd(this.ReceiveUserId, strconv.Itoa(int(this.MsgType)), this.Appid)
  57. return 1, "消息发送成功"
  58. }
  59. return 0, "消息发送失败"
  60. }
  61. func FindUserMsg(this message.FindUserMsgReq, isClean bool) message.FindUserMsgRes {
  62. var err error
  63. var count int64
  64. cquery := map[string]interface{}{
  65. "receive_userid": this.UserId,
  66. "isdel": 1,
  67. "appid": this.Appid,
  68. }
  69. if this.MsgType != -1 {
  70. cquery["msg_type"] = this.MsgType
  71. }
  72. if this.Read != -1 {
  73. cquery["isRead"] = this.Read
  74. }
  75. data := message.FindUserMsgRes{}
  76. if this.PageSize == 5 {
  77. //从缓存里边取数据
  78. pc_a, err := entity.GetData(this.UserId)
  79. if err == nil && pc_a != nil {
  80. // 缓存有值
  81. if !isClean {
  82. data.Code = 1
  83. data.Message = "查询成功"
  84. data.Data = pc_a.Data
  85. data.Count = pc_a.Count
  86. return data
  87. }
  88. }
  89. }
  90. count = entity.Mysql.Count("message", cquery)
  91. if count > 0 {
  92. res := entity.Mysql.Find("message", cquery, "", "createtime desc", (int(this.OffSet)-1)*int(this.PageSize), int(this.PageSize))
  93. //log.Println("数据:", res)
  94. if res != nil && len(*res) > 0 {
  95. for _, v := range *res {
  96. _id := util.Int64All(v["id"])
  97. id := strconv.FormatInt(_id, 10)
  98. data.Data = append(data.Data, &message.Messages{
  99. Id: id,
  100. Appid: util.ObjToString(v["appId"]),
  101. ReceiveUserId: util.ObjToString(v["receive_userid"]),
  102. ReceiveName: util.ObjToString(v["receive_name"]),
  103. SendUserId: util.ObjToString(v["send_userid"]),
  104. SendName: util.ObjToString(v["send_name"]),
  105. Createtime: util.ObjToString(v["createtime"]),
  106. Title: util.ObjToString(v["title"]),
  107. MsgType: int64(util.IntAll(v["msg_type"])),
  108. Link: util.ObjToString(v["link"]),
  109. CiteId: util.Int64All(v["cite_id"]),
  110. Content: util.ObjToString(v["content"]),
  111. IsRead: util.Int64All(v["isRead"]),
  112. MsgLogId: util.Int64All(v["msg_log_id"]),
  113. })
  114. }
  115. }
  116. }
  117. data.Count = count
  118. if this.PageSize == 5 {
  119. redisData := map[string]interface{}{
  120. "count": count,
  121. "data": data.Data,
  122. }
  123. entity.SetData(this.UserId, redisData, entity.SurvivalTime)
  124. }
  125. if err != nil {
  126. data.Code = 0
  127. data.Message = "查询失败"
  128. } else {
  129. data.Code = 1
  130. data.Message = "查询成功"
  131. }
  132. return data
  133. }
  134. // 指定分类未读消息合计
  135. func ClassCountUnread(msgType int, userId string, appId string) (int64, string, int64) {
  136. query := map[string]interface{}{
  137. "msg_type": msgType,
  138. "receive_userid": userId,
  139. "isdel": 1,
  140. "appid": appId,
  141. "isRead": 0,
  142. }
  143. count := entity.Mysql.Count("message", query)
  144. return 1, "查询指定分类未读消息成功", count
  145. }
  146. // MsgCountAdd 消息未读数量加1
  147. func MsgCountAdd(userId, msgType, appId string) bool {
  148. keyString := fmt.Sprintf(MsgCountKey, userId, msgType)
  149. in := redis.Incr(redisModule, keyString)
  150. FindUserMsg(message.FindUserMsgReq{
  151. UserId: userId,
  152. Appid: appId,
  153. OffSet: 1,
  154. PageSize: 5,
  155. MsgType: -1,
  156. Read: 0,
  157. }, true)
  158. return in > 0
  159. }
  160. // MsgCountMinusOne 根据消息类型未读消息数量减1
  161. func MsgCountMinusOne(userId, msgType, appId string) bool {
  162. keyString := fmt.Sprintf(MsgCountKey, userId, msgType)
  163. FindUserMsg(message.FindUserMsgReq{
  164. UserId: userId,
  165. Appid: appId,
  166. OffSet: 1,
  167. PageSize: 5,
  168. MsgType: -1,
  169. Read: 0,
  170. }, true)
  171. if redis.GetInt(redisModule, keyString) <= 0 {
  172. return redis.Put(redisModule, keyString, 0, -1)
  173. }
  174. in := redis.Decrby(redisModule, keyString, 1)
  175. return in > 0
  176. }
  177. // MsgCountZero 把该消息类型未读数量置0
  178. func MsgCountZero(userId, msgType, appId string) bool {
  179. keyString := fmt.Sprintf(MsgCountKey, userId, msgType)
  180. fool := redis.Put(redisModule, keyString, 0, -1)
  181. FindUserMsg(message.FindUserMsgReq{
  182. UserId: userId,
  183. Appid: appId,
  184. OffSet: 1,
  185. PageSize: 5,
  186. MsgType: -1,
  187. Read: 0,
  188. }, true)
  189. return fool
  190. }
  191. func MultSave(this message.MultipleSaveMsgReq) (int64, string) {
  192. userIdArr := strings.Split(this.UserIds, ",")
  193. userNameArr := strings.Split(this.UserNames, ",")
  194. if len(userIdArr) > 0 {
  195. var errCount int64
  196. for k, v := range userIdArr {
  197. if v == "" {
  198. return errCount, "调用结束"
  199. }
  200. userName := userNameArr[k]
  201. //消息数组
  202. c := entity.Mysql.Count("conversation", map[string]interface{}{"receive_id": v, "send_id": this.SendUserId})
  203. sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel,msg_log_id) values ("%s",'%s','%s','%s','%s','%s','%s',%d,'%s',0,'%s',0,1,%d);`
  204. sql3 = fmt.Sprintf(sql3, this.Appid, v, userName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"), this.MsgLogId)
  205. if c <= 0 {
  206. sql1 := `INSERT INTO conversation(appid,secret_key,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  207. sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, v, userName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
  208. ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
  209. //插入会话表
  210. in1 := entity.Mysql.InsertBySqlByTx(tx, sql1)
  211. sql2 := `INSERT INTO conversation(appid,secret_key,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  212. sql2 = fmt.Sprintf(sql2, this.Appid, v, this.SendUserId, this.SendName, v, userName, time.Now().Format("2006-01-02 15:04:05"))
  213. in2 := entity.Mysql.InsertBySqlByTx(tx, sql2)
  214. //插入消息表
  215. in3 := entity.Mysql.InsertBySqlByTx(tx, sql3)
  216. logx.Info(in1, in2, in3)
  217. return in1 > -1 && in2 > -1 && in3 > -1
  218. })
  219. logx.Info("执行事务是否成功:", ok)
  220. if !ok {
  221. errCount++
  222. continue
  223. }
  224. ok1 := MsgCountAdd(v, strconv.Itoa(int(this.MsgType)), this.Appid)
  225. log.Println("存redis:", ok1)
  226. } else {
  227. in := entity.Mysql.InsertBySql(sql3)
  228. logx.Info("插入消息返回 in1 id:", in)
  229. if in > -1 {
  230. ok := MsgCountAdd(v, strconv.Itoa(int(this.MsgType)), this.Appid)
  231. log.Println("存redis:", ok)
  232. } else {
  233. errCount++
  234. }
  235. }
  236. }
  237. return errCount, "发送成功"
  238. }
  239. return 0, "没有要发送的用户"
  240. }