sendMsg.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. package service
  2. import (
  3. "context"
  4. "database/sql"
  5. "encoding/json"
  6. "fmt"
  7. "go.etcd.io/etcd/client/v3/concurrency"
  8. "log"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "app.yhyue.com/moapp/MessageCenter/entity"
  13. "app.yhyue.com/moapp/MessageCenter/rpc/message"
  14. )
  15. // 类型的顺序
  16. const order = "1,4"
  17. const EtcdCount = "Count.%s.%s" //etcd 消息未读数量 Count.用户id.消息类型=数量
  18. func SendMsg(this message.SendMsgRequest) (int64, string) {
  19. //orm := entity.Engine.NewSession()
  20. //defer orm.Close()
  21. //err := orm.Begin()
  22. //fmt.Println(err)
  23. //count := entity.Mysql11.Query("conversation", map[string]interface{}{"receive_id": this.ReceiveUserId, "send_id": this.SendUserId})
  24. r, err := entity.Mysql11.Query("select count(*) as c from conversation where receive_id = ? and send_id = ? ", this.ReceiveUserId, this.SendUserId)
  25. c := 0
  26. log.Println("查询结果", r)
  27. for r.Next() {
  28. err := r.Scan(&c)
  29. if err != nil {
  30. panic(err.Error())
  31. }
  32. }
  33. log.Println("查询数量:", c)
  34. sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel)
  35. values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);`
  36. sql3 = fmt.Sprintf(sql3, this.Appid, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"))
  37. if c < 1 {
  38. sql1 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime)
  39. values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  40. sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
  41. ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
  42. //插入会话表
  43. _, err := entity.Mysql11.Exec(sql1)
  44. sql2 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime)
  45. values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  46. sql2 = fmt.Sprintf(sql2, this.Appid, this.ReceiveUserId, this.SendUserId, this.SendName, this.ReceiveUserId, this.ReceiveName, time.Now().Format("2006-01-02 15:04:05"))
  47. _, err = entity.Mysql11.Exec(sql2)
  48. //插入消息表
  49. _, err = entity.Mysql11.Exec(sql3)
  50. if err != nil {
  51. return false
  52. }
  53. return true
  54. })
  55. if ok {
  56. return 1, "消息发送成功"
  57. }
  58. }
  59. _, err = entity.Mysql11.Exec(sql3)
  60. if err == nil {
  61. go EtcdCountAdd(this.ReceiveUserId, strconv.Itoa(int(this.MsgType)))
  62. return 1, "消息发送成功"
  63. }
  64. return 0, "消息发送失败"
  65. }
  66. func FindUserMsg(this message.FindUserMsgReq) message.FindUserMsgRes {
  67. orm := entity.Engine
  68. var messages []*entity.Message
  69. var err error
  70. var count int64
  71. q := ""
  72. if this.MsgType != -1 {
  73. q += fmt.Sprintf(" and msg_type = %d", this.MsgType)
  74. }
  75. if this.Read != -1 {
  76. q += fmt.Sprintf(" and isRead = %d", this.Read)
  77. }
  78. count, err = orm.Table("message").Where("((receive_userid = ? and send_userid = ?) or (receive_userid = ? and send_userid = ?)) and isdel = ? and appid = ?"+q, this.UserId, this.ReceiveUserId, this.ReceiveUserId, this.UserId, 1, this.Appid).Count()
  79. data := message.FindUserMsgRes{}
  80. if count > 0 {
  81. err = orm.Table("message").Select("*").Where("((receive_userid = ? and send_userid = ?) or (receive_userid = ? and send_userid = ?)) and isdel = ? and appid = ?"+q, this.UserId, this.ReceiveUserId, this.ReceiveUserId, this.UserId, 1, this.Appid).
  82. OrderBy("createtime desc").
  83. Limit(int(this.PageSize), (int(this.OffSet)-1)*int(this.PageSize)).
  84. Find(&messages)
  85. //log.Println("数据:", messages)
  86. for _, v := range messages {
  87. data.Data = append(data.Data, &message.Messages{
  88. Id: v.Id,
  89. Appid: v.AppId,
  90. ReceiveUserId: v.ReceiveUserid,
  91. ReceiveName: v.ReceiveName,
  92. SendUserId: v.SendUserid,
  93. SendName: v.SendName,
  94. Createtime: v.CreateTime.Format("2006-01-02 15:04:05"),
  95. Title: v.Title,
  96. MsgType: int64(v.MsgType),
  97. Link: v.Link,
  98. CiteId: int64(v.CiteId),
  99. Content: v.Content,
  100. IsRead: int64(v.IsRead),
  101. })
  102. }
  103. }
  104. data.Count = count
  105. if err != nil {
  106. data.Code = 0
  107. data.Message = "查询失败"
  108. } else {
  109. data.Code = 1
  110. data.Message = "查询成功"
  111. }
  112. return data
  113. }
  114. // 指定分类未读消息合计
  115. func ClassCountUnread(msgType int, userId string, appId string) (int64, string, int64) {
  116. //orm := entity.Engine
  117. //count, err := orm.Table("message").Where("msg_type=? and receive_userid=? and isdel=1 and appid=? and isRead=0", msgType, userId, appId).Count()
  118. query := map[string]interface{}{
  119. "msg_type": msgType,
  120. "receive_userid": userId,
  121. "isdel": 1,
  122. "appid": appId,
  123. "isRead": 0,
  124. }
  125. count := entity.Mysql.Count("message", query)
  126. return 1, "查询指定分类未读消息成功", count
  127. }
  128. // etcd数量加1
  129. func EtcdCountAdd(userId, msgType string) {
  130. s1, err := concurrency.NewSession(entity.EtcdCli)
  131. if err != nil {
  132. log.Fatal(err)
  133. }
  134. defer s1.Close()
  135. keyString := fmt.Sprintf(EtcdCount, userId, msgType)
  136. m1 := concurrency.NewMutex(s1, keyString)
  137. // 会话s1获取锁
  138. if err := m1.Lock(context.TODO()); err != nil {
  139. log.Fatal("获取锁失败", err)
  140. }
  141. // 操作数量
  142. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  143. resp, err := s1.Client().Get(ctx, keyString)
  144. cancel()
  145. if err != nil {
  146. fmt.Printf("get from etcd failed, err:%v\n", err)
  147. // 释放锁
  148. if err := m1.Unlock(context.TODO()); err != nil {
  149. log.Fatal("释放锁失败", err)
  150. }
  151. return
  152. }
  153. var count int
  154. for _, ev := range resp.Kvs {
  155. if ev.Value != nil {
  156. err := json.Unmarshal([]byte(ev.Value), &count)
  157. if err != nil {
  158. log.Println("etcd get err:", err)
  159. } else {
  160. break
  161. }
  162. }
  163. }
  164. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
  165. _, err = s1.Client().Put(ctx, keyString, strconv.Itoa(count+1))
  166. cancel()
  167. if err != nil {
  168. fmt.Printf("put to etcd failed, err:%v\n", err)
  169. // 释放锁
  170. if err := m1.Unlock(context.TODO()); err != nil {
  171. log.Fatal("释放锁失败", err)
  172. }
  173. return
  174. }
  175. // 释放锁
  176. if err := m1.Unlock(context.TODO()); err != nil {
  177. log.Fatal("释放锁失败", err)
  178. }
  179. }
  180. //单条消息,-1
  181. func EtcdCountMinusOne(userId, msgType string) {
  182. s1, err := concurrency.NewSession(entity.EtcdCli)
  183. if err != nil {
  184. log.Fatal(err)
  185. }
  186. defer s1.Close()
  187. keyString := fmt.Sprintf(EtcdCount, userId, msgType)
  188. m1 := concurrency.NewMutex(s1, keyString)
  189. // 会话s1获取锁
  190. if err := m1.Lock(context.TODO()); err != nil {
  191. log.Fatal("获取锁失败", err)
  192. }
  193. // 操作数量
  194. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  195. resp, err := s1.Client().Get(ctx, keyString)
  196. cancel()
  197. if err != nil {
  198. fmt.Printf("get from etcd failed, err:%v\n", err)
  199. // 释放锁
  200. if err := m1.Unlock(context.TODO()); err != nil {
  201. log.Fatal("释放锁失败", err)
  202. }
  203. return
  204. }
  205. var count int
  206. for _, ev := range resp.Kvs {
  207. if ev.Value != nil {
  208. err := json.Unmarshal([]byte(ev.Value), &count)
  209. if err != nil {
  210. log.Println("etcd get err:", err)
  211. } else {
  212. break
  213. }
  214. }
  215. }
  216. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
  217. var fin string
  218. if count > 0 {
  219. fin = strconv.Itoa(count - 1)
  220. } else {
  221. fin = "0"
  222. }
  223. _, err = s1.Client().Put(ctx, keyString, fin)
  224. cancel()
  225. if err != nil {
  226. fmt.Printf("put to etcd failed, err:%v\n", err)
  227. // 释放锁
  228. if err := m1.Unlock(context.TODO()); err != nil {
  229. log.Fatal("释放锁失败", err)
  230. }
  231. return
  232. }
  233. // 释放锁
  234. if err := m1.Unlock(context.TODO()); err != nil {
  235. log.Fatal("释放锁失败", err)
  236. }
  237. }
  238. // 消息类别置0
  239. func EtcdSetCountZero(userId, msgType string) {
  240. log.Println(" 消息类别置0", userId, msgType)
  241. s1, err := concurrency.NewSession(entity.EtcdCli)
  242. if err != nil {
  243. log.Fatal(err)
  244. }
  245. defer s1.Close()
  246. keyString := fmt.Sprintf(EtcdCount, userId, msgType)
  247. m1 := concurrency.NewMutex(s1, keyString)
  248. // 会话s1获取锁
  249. if err := m1.Lock(context.TODO()); err != nil {
  250. log.Fatal("获取锁失败", err)
  251. }
  252. // 操作数量
  253. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  254. _, err = s1.Client().Put(ctx, keyString, "0")
  255. cancel()
  256. if err != nil {
  257. fmt.Printf("put to etcd failed, err:%v\n", err)
  258. // 释放锁
  259. if err := m1.Unlock(context.TODO()); err != nil {
  260. log.Fatal("释放锁失败", err)
  261. }
  262. return
  263. }
  264. // 释放锁
  265. if err := m1.Unlock(context.TODO()); err != nil {
  266. log.Fatal("释放锁失败", err)
  267. }
  268. }
  269. func MultSave(this message.MultipleSaveMsgReq) (int64, string) {
  270. userIdArr := strings.Split(this.UserIds, ",")
  271. userNameArr := strings.Split(this.UserNames, ",")
  272. log.Println(len(userIdArr), len(userNameArr))
  273. if len(userIdArr) > 0 {
  274. var errCount int64
  275. for k, v := range userIdArr {
  276. log.Println("k--------", k, v)
  277. if v == "" {
  278. return 0, "调用结束"
  279. }
  280. userName := userNameArr[k]
  281. //消息数组
  282. c := entity.Mysql.Count("conversation", map[string]interface{}{"receive_id": v, "send_id": this.SendUserId})
  283. log.Println("查询数量:", c)
  284. sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel) values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);`
  285. sql3 = fmt.Sprintf(sql3, this.Appid, v, userName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"))
  286. if c < 1 {
  287. sql1 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  288. sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, v, userName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
  289. ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
  290. //插入会话表
  291. bT := time.Now() //开始时间
  292. _, err := entity.Mysql.DB.Exec(sql1)
  293. log.Println("**********1",err)
  294. sql2 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  295. sql2 = fmt.Sprintf(sql2, this.Appid, v, this.SendUserId, this.SendName, v, userName, time.Now().Format("2006-01-02 15:04:05"))
  296. _, err = entity.Mysql.DB.Exec(sql2)
  297. log.Println("**********2",err)
  298. //插入消息表
  299. _, err = entity.Mysql.DB.Exec(sql3)
  300. eT := time.Since(bT) // 从开始到当前所消耗的时间
  301. log.Println("存储耗时:", eT)
  302. if err != nil {
  303. return false
  304. }
  305. return true
  306. })
  307. if !ok {
  308. errCount++
  309. continue
  310. }
  311. go EtcdCountAdd(v, strconv.Itoa(int(this.MsgType)))
  312. } else {
  313. _, err := entity.Mysql.DB.Exec(sql3)
  314. if err == nil {
  315. go EtcdCountAdd(v, strconv.Itoa(int(this.MsgType)))
  316. } else {
  317. errCount++
  318. }
  319. }
  320. }
  321. return errCount, "发送成功"
  322. }
  323. return 0, "没有要发送的用户"
  324. }