package service import ( "app.yhyue.com/moapp/jybase/redis" "database/sql" "fmt" "github.com/tal-tech/go-zero/core/logx" "log" "strconv" "strings" "time" "app.yhyue.com/moapp/MessageCenter/entity" "app.yhyue.com/moapp/MessageCenter/rpc/message" "app.yhyue.com/moapp/MessageCenter/util" ) // 类型的顺序 const order = "1,4" const MsgCountKey = "count_%s_%s" //redis 消息未读数量 Count.用户id.消息类型=数量 const redisModule = "msgCount" /*var ( UserLockMap = map[string]*sync.Mutex{} //MainLock = sync.Mutex{} )*/ func SendMsg(this message.SendMsgRequest) (int64, string) { r, err := entity.Mysql11.Query("select count(*) as c from conversation where receive_id = ? and send_id = ? ", this.ReceiveUserId, this.SendUserId) c := 0 for r.Next() { err := r.Scan(&c) if err != nil { panic(err.Error()) } } logx.Info("查询数量:", c) sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel) values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);` sql3 = fmt.Sprintf(sql3, this.Appid, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05")) if c < 1 { sql1 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');` sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05")) ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool { //插入会话表 _, err := entity.Mysql11.Exec(sql1) sql2 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');` sql2 = fmt.Sprintf(sql2, this.Appid, this.ReceiveUserId, this.SendUserId, this.SendName, this.ReceiveUserId, this.ReceiveName, time.Now().Format("2006-01-02 15:04:05")) _, err = entity.Mysql11.Exec(sql2) //插入消息表 _, err = entity.Mysql11.Exec(sql3) if err != nil { return false } return true }) if ok { return 1, "消息发送成功" } } _, err = entity.Mysql11.Exec(sql3) if err == nil { MsgCountAdd(this.ReceiveUserId, strconv.Itoa(int(this.MsgType))) return 1, "消息发送成功" } return 0, "消息发送失败" } func FindUserMsg(this message.FindUserMsgReq) message.FindUserMsgRes { //orm := entity.Engine //var messages []*entity.Message var err error var count int64 cquery := map[string]interface{}{ "receive_userid": this.UserId, "isdel": 1, "appid": this.Appid, } if this.MsgType != -1 { cquery["msg_type"] = this.MsgType } if this.Read != -1 { cquery["isRead"] = this.Read } count = entity.Mysql.Count("message", cquery) //count, err = orm.Table("message").Where("((receive_userid = ? and send_userid = ?) or (receive_userid = ? and send_userid = ?)) and isdel = ? and appid = ?"+q, this.UserId, this.ReceiveUserId, this.ReceiveUserId, this.UserId, 1, this.Appid).Count() data := message.FindUserMsgRes{} if count > 0 { res := entity.Mysql.Find("message", cquery, "", "createtime desc", (int(this.OffSet)-1)*int(this.PageSize), int(this.PageSize)) //log.Println("数据:", res) if res != nil && len(*res) > 0 { for _, v := range *res { _id := util.Int64All(v["id"]) id := strconv.FormatInt(_id, 10) data.Data = append(data.Data, &message.Messages{ Id: id, Appid: util.ObjToString(v["appId"]), ReceiveUserId: util.ObjToString(v["receive_userid"]), ReceiveName: util.ObjToString(v["receive_name"]), SendUserId: util.ObjToString(v["send_userid"]), SendName: util.ObjToString(v["send_name"]), Createtime: util.ObjToString(v["createtime"]), Title: util.ObjToString(v["title"]), MsgType: int64(util.IntAll(v["msg_type"])), Link: util.ObjToString(v["link"]), CiteId: util.Int64All(v["cite_id"]), Content: util.ObjToString(v["content"]), IsRead: util.Int64All(v["isRead"]), MsgLogId: util.Int64All(v["msg_log_id"]), }) } } } data.Count = count if err != nil { data.Code = 0 data.Message = "查询失败" } else { data.Code = 1 data.Message = "查询成功" } return data } // 指定分类未读消息合计 func ClassCountUnread(msgType int, userId string, appId string) (int64, string, int64) { query := map[string]interface{}{ "msg_type": msgType, "receive_userid": userId, "isdel": 1, "appid": appId, "isRead": 0, } count := entity.Mysql.Count("message", query) return 1, "查询指定分类未读消息成功", count } // MsgCountAdd 消息未读数量加1 func MsgCountAdd(userId, msgType string) bool { keyString := fmt.Sprintf(MsgCountKey, userId, msgType) in := redis.Incr(redisModule, keyString) return in > 0 } // MsgCountMinusOne 根据消息类型未读消息数量减1 func MsgCountMinusOne(userId, msgType string) bool { keyString := fmt.Sprintf(MsgCountKey, userId, msgType) in := redis.Decrby(redisModule, keyString, 1) return in > 0 } // MsgCountZero 把该消息类型未读数量置0 func MsgCountZero(userId, msgType string) bool { keyString := fmt.Sprintf(MsgCountKey, userId, msgType) return redis.Put(redisModule, keyString, 0, -1) } func MultSave(this message.MultipleSaveMsgReq) (int64, string) { userIdArr := strings.Split(this.UserIds, ",") userNameArr := strings.Split(this.UserNames, ",") if len(userIdArr) > 0 { var errCount int64 for k, v := range userIdArr { if v == "" { return errCount, "调用结束" } userName := userNameArr[k] //消息数组 c := entity.Mysql.Count("conversation", map[string]interface{}{"receive_id": v, "send_id": this.SendUserId}) sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel,msg_log_id) values ("%s",'%s','%s','%s','%s','%s','%s',%d,'%s',0,'%s',0,1,%d);` sql3 = fmt.Sprintf(sql3, this.Appid, v, userName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"), this.MsgLogId) if c <= 0 { sql1 := `INSERT INTO conversation(appid,secret_key,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');` sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, v, userName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05")) ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool { //插入会话表 in1 := entity.Mysql.InsertBySqlByTx(tx, sql1) sql2 := `INSERT INTO conversation(appid,secret_key,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) values ('%s','','%s','%s','%s','%s','%s',0,'%s');` sql2 = fmt.Sprintf(sql2, this.Appid, v, this.SendUserId, this.SendName, v, userName, time.Now().Format("2006-01-02 15:04:05")) in2 := entity.Mysql.InsertBySqlByTx(tx, sql2) //插入消息表 in3 := entity.Mysql.InsertBySqlByTx(tx, sql3) logx.Info(in1, in2, in3) return in1 > -1 && in2 > -1 && in3 > -1 }) logx.Info("执行事务是否成功:", ok) if !ok { errCount++ continue } ok1 := MsgCountAdd(v, strconv.Itoa(int(this.MsgType))) log.Println("存redis:", ok1) } else { in := entity.Mysql.InsertBySql(sql3) logx.Info("插入消息返回 in1 id:", in) if in > -1 { ok := MsgCountAdd(v, strconv.Itoa(int(this.MsgType))) log.Println("存redis:", ok) } else { errCount++ } } } return errCount, "发送成功" } return 0, "没有要发送的用户" }