sendMsg.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. package service
  2. import (
  3. "app.yhyue.com/moapp/MessageCenter/util"
  4. "context"
  5. "database/sql"
  6. "encoding/json"
  7. "fmt"
  8. "go.etcd.io/etcd/client/v3/concurrency"
  9. "log"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "app.yhyue.com/moapp/MessageCenter/entity"
  14. "app.yhyue.com/moapp/MessageCenter/rpc/message"
  15. )
  16. // 类型的顺序
  17. const order = "1,4"
  18. const EtcdCount = "Count.%s.%s" //etcd 消息未读数量 Count.用户id.消息类型=数量
  19. func SendMsg(this message.SendMsgRequest) (int64, string) {
  20. //orm := entity.Engine.NewSession()
  21. //defer orm.Close()
  22. //err := orm.Begin()
  23. //fmt.Println(err)
  24. //count := entity.Mysql11.Query("conversation", map[string]interface{}{"receive_id": this.ReceiveUserId, "send_id": this.SendUserId})
  25. r, err := entity.Mysql11.Query("select count(*) as c from conversation where receive_id = ? and send_id = ? ", this.ReceiveUserId, this.SendUserId)
  26. c := 0
  27. log.Println("查询结果", r)
  28. for r.Next() {
  29. err := r.Scan(&c)
  30. if err != nil {
  31. panic(err.Error())
  32. }
  33. }
  34. log.Println("查询数量:", c)
  35. sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel)
  36. values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);`
  37. sql3 = fmt.Sprintf(sql3, this.Appid, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"))
  38. if c < 1 {
  39. sql1 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime)
  40. values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  41. sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
  42. ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
  43. //插入会话表
  44. _, err := entity.Mysql11.Exec(sql1)
  45. sql2 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime)
  46. values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  47. sql2 = fmt.Sprintf(sql2, this.Appid, this.ReceiveUserId, this.SendUserId, this.SendName, this.ReceiveUserId, this.ReceiveName, time.Now().Format("2006-01-02 15:04:05"))
  48. _, err = entity.Mysql11.Exec(sql2)
  49. //插入消息表
  50. _, err = entity.Mysql11.Exec(sql3)
  51. if err != nil {
  52. return false
  53. }
  54. return true
  55. })
  56. if ok {
  57. return 1, "消息发送成功"
  58. }
  59. }
  60. _, err = entity.Mysql11.Exec(sql3)
  61. if err == nil {
  62. go EtcdCountAdd(this.ReceiveUserId, strconv.Itoa(int(this.MsgType)))
  63. return 1, "消息发送成功"
  64. }
  65. return 0, "消息发送失败"
  66. }
  67. func FindUserMsg(this message.FindUserMsgReq) message.FindUserMsgRes {
  68. //orm := entity.Engine
  69. //var messages []*entity.Message
  70. var err error
  71. var count int64
  72. cquery := map[string]interface{}{
  73. "receive_userid": this.UserId,
  74. "isdel": 1,
  75. "appid": this.Appid,
  76. }
  77. if this.MsgType != -1 {
  78. cquery["msg_type"] = this.MsgType
  79. }
  80. if this.Read != -1 {
  81. cquery["isRead"] = this.Read
  82. }
  83. count = entity.Mysql.Count("message", cquery)
  84. //count, err = orm.Table("message").Where("((receive_userid = ? and send_userid = ?) or (receive_userid = ? and send_userid = ?)) and isdel = ? and appid = ?"+q, this.UserId, this.ReceiveUserId, this.ReceiveUserId, this.UserId, 1, this.Appid).Count()
  85. data := message.FindUserMsgRes{}
  86. if count > 0 {
  87. /*err = orm.Table("message").Select("*").Where("((receive_userid = ? and send_userid = ?) or (receive_userid = ? and send_userid = ?)) and isdel = ? and appid = ?"+q, this.UserId, this.ReceiveUserId, this.ReceiveUserId, this.UserId, 1, this.Appid).
  88. OrderBy("createtime desc").
  89. Limit(int(this.PageSize), (int(this.OffSet)-1)*int(this.PageSize)).
  90. Find(&messages)*/
  91. res := entity.Mysql.Find("message", cquery, "", "createtime desc", (int(this.OffSet)-1)*int(this.PageSize), int(this.PageSize))
  92. log.Println("数据:", res)
  93. if res != nil && len(*res) > 0 {
  94. for _, v := range *res {
  95. _id := util.Int64All(v["id"])
  96. id := strconv.FormatInt(_id, 10)
  97. data.Data = append(data.Data, &message.Messages{
  98. Id: id,
  99. Appid: util.ObjToString(v["appId"]),
  100. ReceiveUserId: util.ObjToString(v["receive_userid"]),
  101. ReceiveName: util.ObjToString(v["receive_name"]),
  102. SendUserId: util.ObjToString(v["send_userid"]),
  103. SendName: util.ObjToString(v["send_name"]),
  104. Createtime: util.ObjToString(v["createtime"]),
  105. Title: util.ObjToString(v["title"]),
  106. MsgType: int64(util.IntAll(v["msg_type"])),
  107. Link: util.ObjToString(v["link"]),
  108. CiteId: util.Int64All(v["cite_id"]),
  109. Content: util.ObjToString(v["content"]),
  110. IsRead: util.Int64All(v["isRead"]),
  111. })
  112. }
  113. }
  114. }
  115. data.Count = count
  116. if err != nil {
  117. data.Code = 0
  118. data.Message = "查询失败"
  119. } else {
  120. data.Code = 1
  121. data.Message = "查询成功"
  122. }
  123. return data
  124. }
  125. // 指定分类未读消息合计
  126. func ClassCountUnread(msgType int, userId string, appId string) (int64, string, int64) {
  127. //orm := entity.Engine
  128. //count, err := orm.Table("message").Where("msg_type=? and receive_userid=? and isdel=1 and appid=? and isRead=0", msgType, userId, appId).Count()
  129. query := map[string]interface{}{
  130. "msg_type": msgType,
  131. "receive_userid": userId,
  132. "isdel": 1,
  133. "appid": appId,
  134. "isRead": 0,
  135. }
  136. count := entity.Mysql.Count("message", query)
  137. return 1, "查询指定分类未读消息成功", count
  138. }
  139. // etcd数量加1
  140. func EtcdCountAdd(userId, msgType string) {
  141. s1, err := concurrency.NewSession(entity.EtcdCli)
  142. if err != nil {
  143. log.Fatal(err)
  144. }
  145. defer s1.Close()
  146. keyString := fmt.Sprintf(EtcdCount, userId, msgType)
  147. m1 := concurrency.NewMutex(s1, keyString)
  148. // 会话s1获取锁
  149. if err := m1.Lock(context.TODO()); err != nil {
  150. log.Fatal("获取锁失败", err)
  151. }
  152. // 操作数量
  153. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  154. resp, err := s1.Client().Get(ctx, keyString)
  155. cancel()
  156. if err != nil {
  157. fmt.Printf("get from etcd failed, err:%v\n", err)
  158. // 释放锁
  159. if err := m1.Unlock(context.TODO()); err != nil {
  160. log.Fatal("释放锁失败", err)
  161. }
  162. return
  163. }
  164. var count int
  165. for _, ev := range resp.Kvs {
  166. if ev.Value != nil {
  167. err := json.Unmarshal([]byte(ev.Value), &count)
  168. if err != nil {
  169. log.Println("etcd get err:", err)
  170. } else {
  171. break
  172. }
  173. }
  174. }
  175. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
  176. _, err = s1.Client().Put(ctx, keyString, strconv.Itoa(count+1))
  177. cancel()
  178. if err != nil {
  179. fmt.Printf("put to etcd failed, err:%v\n", err)
  180. // 释放锁
  181. if err := m1.Unlock(context.TODO()); err != nil {
  182. log.Fatal("释放锁失败", err)
  183. }
  184. return
  185. }
  186. // 释放锁
  187. if err := m1.Unlock(context.TODO()); err != nil {
  188. log.Fatal("释放锁失败", err)
  189. }
  190. }
  191. //单条消息,-1
  192. func EtcdCountMinusOne(userId, msgType string) {
  193. s1, err := concurrency.NewSession(entity.EtcdCli)
  194. if err != nil {
  195. log.Fatal(err)
  196. }
  197. defer s1.Close()
  198. keyString := fmt.Sprintf(EtcdCount, userId, msgType)
  199. m1 := concurrency.NewMutex(s1, keyString)
  200. // 会话s1获取锁
  201. if err := m1.Lock(context.TODO()); err != nil {
  202. log.Fatal("获取锁失败", err)
  203. }
  204. // 操作数量
  205. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  206. resp, err := s1.Client().Get(ctx, keyString)
  207. cancel()
  208. if err != nil {
  209. fmt.Printf("get from etcd failed, err:%v\n", err)
  210. // 释放锁
  211. if err := m1.Unlock(context.TODO()); err != nil {
  212. log.Fatal("释放锁失败", err)
  213. }
  214. return
  215. }
  216. var count int
  217. for _, ev := range resp.Kvs {
  218. if ev.Value != nil {
  219. err := json.Unmarshal([]byte(ev.Value), &count)
  220. if err != nil {
  221. log.Println("etcd get err:", err)
  222. } else {
  223. break
  224. }
  225. }
  226. }
  227. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
  228. var fin string
  229. if count > 0 {
  230. fin = strconv.Itoa(count - 1)
  231. } else {
  232. fin = "0"
  233. }
  234. _, err = s1.Client().Put(ctx, keyString, fin)
  235. cancel()
  236. if err != nil {
  237. fmt.Printf("put to etcd failed, err:%v\n", err)
  238. // 释放锁
  239. if err := m1.Unlock(context.TODO()); err != nil {
  240. log.Fatal("释放锁失败", err)
  241. }
  242. return
  243. }
  244. // 释放锁
  245. if err := m1.Unlock(context.TODO()); err != nil {
  246. log.Fatal("释放锁失败", err)
  247. }
  248. }
  249. // 消息类别置0
  250. func EtcdSetCountZero(userId, msgType string) {
  251. log.Println(" 消息类别置0", userId, msgType)
  252. s1, err := concurrency.NewSession(entity.EtcdCli)
  253. if err != nil {
  254. log.Fatal(err)
  255. }
  256. defer s1.Close()
  257. keyString := fmt.Sprintf(EtcdCount, userId, msgType)
  258. m1 := concurrency.NewMutex(s1, keyString)
  259. // 会话s1获取锁
  260. if err := m1.Lock(context.TODO()); err != nil {
  261. log.Fatal("获取锁失败", err)
  262. }
  263. // 操作数量
  264. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
  265. _, err = s1.Client().Put(ctx, keyString, "0")
  266. cancel()
  267. if err != nil {
  268. fmt.Printf("put to etcd failed, err:%v\n", err)
  269. // 释放锁
  270. if err := m1.Unlock(context.TODO()); err != nil {
  271. log.Fatal("释放锁失败", err)
  272. }
  273. return
  274. }
  275. // 释放锁
  276. if err := m1.Unlock(context.TODO()); err != nil {
  277. log.Fatal("释放锁失败", err)
  278. }
  279. }
  280. func MultSave(this message.MultipleSaveMsgReq) (int64, string) {
  281. userIdArr := strings.Split(this.UserIds, ",")
  282. userNameArr := strings.Split(this.UserNames, ",")
  283. log.Println(len(userIdArr), len(userNameArr))
  284. if len(userIdArr) > 0 {
  285. var errCount int64
  286. for k, v := range userIdArr {
  287. log.Println("k--------", k, v)
  288. if v == "" {
  289. return 0, "调用结束"
  290. }
  291. userName := userNameArr[k]
  292. //消息数组
  293. c := entity.Mysql.Count("conversation", map[string]interface{}{"receive_id": v, "send_id": this.SendUserId})
  294. log.Println("查询数量:", c)
  295. sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel) values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);`
  296. sql3 = fmt.Sprintf(sql3, this.Appid, v, userName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"))
  297. if c < 1 {
  298. sql1 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  299. sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, v, userName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
  300. ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
  301. //插入会话表
  302. bT := time.Now() //开始时间
  303. _, err := entity.Mysql.DB.Exec(sql1)
  304. log.Println("**********1", err)
  305. sql2 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
  306. sql2 = fmt.Sprintf(sql2, this.Appid, v, this.SendUserId, this.SendName, v, userName, time.Now().Format("2006-01-02 15:04:05"))
  307. _, err = entity.Mysql.DB.Exec(sql2)
  308. log.Println("**********2", err)
  309. //插入消息表
  310. _, err = entity.Mysql.DB.Exec(sql3)
  311. eT := time.Since(bT) // 从开始到当前所消耗的时间
  312. log.Println("存储耗时:", eT)
  313. if err != nil {
  314. return false
  315. }
  316. return true
  317. })
  318. if !ok {
  319. errCount++
  320. continue
  321. }
  322. go EtcdCountAdd(v, strconv.Itoa(int(this.MsgType)))
  323. } else {
  324. _, err := entity.Mysql.DB.Exec(sql3)
  325. if err == nil {
  326. go EtcdCountAdd(v, strconv.Itoa(int(this.MsgType)))
  327. } else {
  328. errCount++
  329. }
  330. }
  331. }
  332. return errCount, "发送成功"
  333. }
  334. return 0, "没有要发送的用户"
  335. }