sendMsg.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. package service
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/entity"
  4. "app.yhyue.com/moapp/MessageCenter/rpc/type/message"
  5. "app.yhyue.com/moapp/MessageCenter/util"
  6. "app.yhyue.com/moapp/jybase/common"
  7. "app.yhyue.com/moapp/jybase/redis"
  8. "database/sql"
  9. "fmt"
  10. "github.com/zeromicro/go-zero/core/logx"
  11. "log"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. )
  17. // 类型的顺序
  18. const order = "1,4"
  19. const MsgCountKey = "count_%s_%s" //redis 消息未读数量 Count.用户id.消息类型=数量
  20. const redisModule = "msgCount"
  21. /*var (
  22. UserLockMap = map[string]*sync.Mutex{}
  23. //MainLock = sync.Mutex{}
  24. )*/
  25. func SendMsg(this message.SendMsgRequest) (int64, string) {
  26. r, err := entity.Mysql11.Query("select count(*) as c from conversation where receive_id = ? and send_id = ? ", this.ReceiveUserId, this.SendUserId)
  27. c := 0
  28. for r.Next() {
  29. err := r.Scan(&c)
  30. if err != nil {
  31. panic(err.Error())
  32. }
  33. }
  34. logx.Info("查询数量:", c)
  35. sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel)
  36. values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);`
  37. sql3 = fmt.Sprintf(sql3, this.Appid, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"))
  38. if c < 1 {
  39. sql1 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime)
  40. values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  41. sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
  42. ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
  43. //插入会话表
  44. _, err := entity.Mysql11.Exec(sql1)
  45. sql2 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime)
  46. values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  47. sql2 = fmt.Sprintf(sql2, this.Appid, this.ReceiveUserId, this.SendUserId, this.SendName, this.ReceiveUserId, this.ReceiveName, time.Now().Format("2006-01-02 15:04:05"))
  48. _, err = entity.Mysql11.Exec(sql2)
  49. //插入消息表
  50. _, err = entity.Mysql11.Exec(sql3)
  51. if err != nil {
  52. return false
  53. }
  54. return true
  55. })
  56. if ok {
  57. return 1, "消息发送成功"
  58. }
  59. }
  60. _, err = entity.Mysql11.Exec(sql3)
  61. if err == nil {
  62. MsgCountAdd(this.ReceiveUserId, strconv.Itoa(int(this.MsgType)), this.Appid)
  63. return 1, "消息发送成功"
  64. }
  65. return 0, "消息发送失败"
  66. }
  67. func FindUserMsg(this message.FindUserMsgReq, isClean bool) message.FindUserMsgRes {
  68. var err error
  69. var count int64
  70. cquery := map[string]interface{}{
  71. "receive_userid": this.UserId,
  72. "isdel": 1,
  73. "appid": this.Appid,
  74. }
  75. if this.MsgType != -1 {
  76. cquery["msg_type"] = this.MsgType
  77. }
  78. if this.Read != -1 {
  79. cquery["isRead"] = this.Read
  80. }
  81. count = entity.Mysql.Count("message", cquery)
  82. data := message.FindUserMsgRes{}
  83. if this.PageSize == 5 {
  84. //从缓存里边取数据
  85. pc_a, err := entity.GetData(this.UserId)
  86. if err == nil && pc_a != nil {
  87. // 缓存有值
  88. if !isClean {
  89. data.Code = 1
  90. data.Message = "查询成功"
  91. data.Data = pc_a.Data
  92. data.Count = pc_a.Count
  93. return data
  94. }
  95. }
  96. }
  97. count = entity.Mysql.Count("message", cquery)
  98. if count > 0 {
  99. res := entity.Mysql.Find("message", cquery, "", "createtime desc", (int(this.OffSet)-1)*int(this.PageSize), int(this.PageSize))
  100. //log.Println("数据:", res)
  101. if res != nil && len(*res) > 0 {
  102. for _, v := range *res {
  103. _id := util.Int64All(v["id"])
  104. id := strconv.FormatInt(_id, 10)
  105. data.Data = append(data.Data, &message.Messages{
  106. Id: id,
  107. Appid: util.ObjToString(v["appId"]),
  108. ReceiveUserId: util.ObjToString(v["receive_userid"]),
  109. ReceiveName: util.ObjToString(v["receive_name"]),
  110. SendUserId: util.ObjToString(v["send_userid"]),
  111. SendName: util.ObjToString(v["send_name"]),
  112. Createtime: util.ObjToString(v["createtime"]),
  113. Title: util.ObjToString(v["title"]),
  114. MsgType: int64(util.IntAll(v["msg_type"])),
  115. Link: util.ObjToString(v["link"]),
  116. CiteId: util.Int64All(v["cite_id"]),
  117. Content: util.ObjToString(v["content"]),
  118. IsRead: util.Int64All(v["isRead"]),
  119. MsgLogId: util.Int64All(v["msg_log_id"]),
  120. })
  121. }
  122. }
  123. }
  124. data.Count = count
  125. if this.PageSize == 5 {
  126. redisData := map[string]interface{}{
  127. "count": count,
  128. "data": data.Data,
  129. }
  130. entity.SetData(this.UserId, redisData, entity.SurvivalTime)
  131. }
  132. if err != nil {
  133. data.Code = 0
  134. data.Message = "查询失败"
  135. } else {
  136. data.Code = 1
  137. data.Message = "查询成功"
  138. }
  139. return data
  140. }
  141. // 指定分类未读消息合计
  142. func ClassCountUnread(msgType int, userId string, appId string) (int64, string, int64) {
  143. query := map[string]interface{}{
  144. "msg_type": msgType,
  145. "receive_userid": userId,
  146. "isdel": 1,
  147. "appid": appId,
  148. "isRead": 0,
  149. }
  150. count := entity.Mysql.Count("message", query)
  151. return 1, "查询指定分类未读消息成功", count
  152. }
  153. // MsgCountAdd 消息未读数量加1
  154. func MsgCountAdd(userId, msgType, appId string) bool {
  155. keyString := fmt.Sprintf(MsgCountKey, userId, msgType)
  156. in := redis.Incr(redisModule, keyString)
  157. FindUserMsg(message.FindUserMsgReq{
  158. UserId: userId,
  159. Appid: appId,
  160. OffSet: 1,
  161. PageSize: 5,
  162. MsgType: -1,
  163. Read: 0,
  164. }, true)
  165. return in > 0
  166. }
  167. // MsgCountMinusOne 根据消息类型未读消息数量减1
  168. func MsgCountMinusOne(userId, msgType, appId string) bool {
  169. keyString := fmt.Sprintf(MsgCountKey, userId, msgType)
  170. FindUserMsg(message.FindUserMsgReq{
  171. UserId: userId,
  172. Appid: appId,
  173. OffSet: 1,
  174. PageSize: 5,
  175. MsgType: -1,
  176. Read: 0,
  177. }, true)
  178. if redis.GetInt(redisModule, keyString) <= 0 {
  179. return redis.Put(redisModule, keyString, 0, -1)
  180. }
  181. in := redis.Decrby(redisModule, keyString, 1)
  182. return in > 0
  183. }
  184. // MsgCountZero 把该消息类型未读数量置0
  185. func MsgCountZero(userId, msgType, appId string) bool {
  186. keyString := fmt.Sprintf(MsgCountKey, userId, msgType)
  187. fool := redis.Put(redisModule, keyString, 0, -1)
  188. FindUserMsg(message.FindUserMsgReq{
  189. UserId: userId,
  190. Appid: appId,
  191. OffSet: 1,
  192. PageSize: 5,
  193. MsgType: -1,
  194. Read: 0,
  195. }, true)
  196. return fool
  197. }
  198. func MultSave(this message.MultipleSaveMsgReq) (int64, string) {
  199. userIdArr := strings.Split(this.UserIds, ",")
  200. userNameArr := strings.Split(this.UserNames, ",")
  201. positionIdArr := strings.Split(this.PositionIds, ",")
  202. if len(userIdArr) == 0 {
  203. return 0, "无效的用户id"
  204. }
  205. wg := &sync.WaitGroup{}
  206. for i := 0; i < len(userIdArr); i++ {
  207. if userIdArr[i] == "" {
  208. continue
  209. }
  210. name := userNameArr[i]
  211. wg.Add(1)
  212. entity.SaveConcurrencyChan <- 1
  213. var positionId int64
  214. if len(positionIdArr) == len(userIdArr) {
  215. positionId = common.Int64All(positionIdArr[i])
  216. }
  217. go func(v, userName string, positionId int64) {
  218. defer func() {
  219. <-entity.SaveConcurrencyChan
  220. wg.Done()
  221. }()
  222. //消息数组
  223. c := entity.Mysql.Count("conversation", map[string]interface{}{"receive_id": v, "send_id": this.SendUserId})
  224. sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel,msg_log_id,show_buoy,show_content,position_id) values ("%s",'%s','%s','%s','%s','%s','%s',%d,'%s',0,'%s',0,1,%d,%d,'%s',?);`
  225. sql3 = fmt.Sprintf(sql3, this.Appid, v, userName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"), this.MsgLogId, this.ShowBuoy, this.ShowContent)
  226. if c <= 0 {
  227. sql1 := `INSERT INTO conversation(appid,secret_key,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  228. sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, v, userName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
  229. ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
  230. //插入会话表
  231. in1 := entity.Mysql.InsertBySqlByTx(tx, sql1)
  232. sql2 := `INSERT INTO conversation(appid,secret_key,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  233. sql2 = fmt.Sprintf(sql2, this.Appid, v, this.SendUserId, this.SendName, v, userName, time.Now().Format("2006-01-02 15:04:05"))
  234. in2 := entity.Mysql.InsertBySqlByTx(tx, sql2)
  235. //插入消息表
  236. in3 := entity.Mysql.InsertBySqlByTx(tx, sql3, common.If(positionId != 0, positionId, nil))
  237. logx.Info(in1, in2, in3)
  238. return in1 > -1 && in2 > -1 && in3 > -1
  239. })
  240. logx.Info("执行事务是否成功:", ok)
  241. if ok {
  242. ok1 := MsgCountAdd(v, strconv.Itoa(int(this.MsgType)), this.Appid)
  243. if !ok1 {
  244. log.Println("存redis:", ok1, v)
  245. }
  246. }
  247. } else {
  248. in := entity.Mysql.InsertBySql(sql3, common.If(positionId != 0, positionId, nil))
  249. logx.Info("插入消息返回 in1 id:", in)
  250. if in > -1 {
  251. ok := MsgCountAdd(v, strconv.Itoa(int(this.MsgType)), this.Appid)
  252. if !ok {
  253. log.Println("存redis:", ok, v)
  254. }
  255. }
  256. }
  257. }(userIdArr[i], name, positionId)
  258. }
  259. wg.Wait()
  260. return 0, ""
  261. }