jiaojiao7 3 лет назад
Родитель
Сommit
fa54dabbaa

+ 2 - 4
README.md

@@ -1,7 +1,5 @@
 消息中台
 goctl rpc proto -src message.proto -dir .
 goctl api go -api message.api -dir .
-
-
-v1.1
-消息中心1.0
+v1.1.1
+消息中心1.0优化版本

+ 8 - 12
api/internal/logic/multiplesavemsglogic.go

@@ -46,10 +46,14 @@ func (l *MultipleSaveMsgLogic) MultipleSaveMsg(req types.MultipleSaveMsgReq) (*t
 	for _, val := range req.SaveData {
 		msgType_, err := strconv.Atoi(util.ObjToString(val["msgType"]))
 		msgType := int64(msgType_)
-		CiteId_, err := strconv.Atoi(util.ObjToString(val["citeId"]))
-		CiteId := int64(CiteId_)
-		log.Println(err)
-		resp, err := lsi.SendUserMsg(l.ctx, &messageclient.SendMsgRequest{
+		var CiteId int64
+		if val["citeId"] != nil {
+			CiteId_, err := strconv.Atoi(util.ObjToString(val["citeId"]))
+			CiteId = int64(CiteId_)
+			log.Println(err)
+		}
+
+		_, err = lsi.SendUserMsg(l.ctx, &messageclient.SendMsgRequest{
 			Appid:         util.ObjToString(val["appid"]),
 			ReceiveUserId: util.ObjToString(val["receiveUserId"]),
 			ReceiveName:   util.ObjToString(val["receiveName"]),
@@ -61,14 +65,6 @@ func (l *MultipleSaveMsgLogic) MultipleSaveMsg(req types.MultipleSaveMsgReq) (*t
 			Link:          util.ObjToString(val["link"]),
 			CiteId:        CiteId,
 		})
-		// todo 记录用户id
-		if resp.Code == int64(1) {
-			userIdList = append(userIdList, map[string]interface{}{
-				"userId":util.ObjToString(val["receiveUserId"]),
-				"msgType":msgType,
-			} )
-		}
-		log.Println(resp)
 		if err != nil {
 			errCount++
 		}

+ 4 - 0
entity/message.go

@@ -1,6 +1,8 @@
 package entity
 
 import (
+	"app.yhyue.com/moapp/jybase/mysql"
+	clientv3 "go.etcd.io/etcd/client/v3"
 	"time"
 
 	"github.com/go-xorm/xorm"
@@ -8,6 +10,8 @@ import (
 
 //定义orm引擎
 var Engine *xorm.Engine
+var EtcdCli *clientv3.Client
+var Mysql *mysql.Mysql
 
 type Message struct {
 	Id            string       `xorm:"id" form:"id" json:"id"`

+ 11 - 3
rpc/etc/message.yaml

@@ -2,12 +2,20 @@ Name: message.rpc
 ListenOn: 127.0.0.1:8081
 Etcd:
   Hosts:
-  - 127.0.0.1:2379
+  - 192.168.3.240:2379
+  - 192.168.3.240:2379
   Key: message.rpc
   Timeout: 6000
-DataSource: jianyu:topnet@123@tcp(am-2ze6crwd6bb0283jn167320o.ads.aliyuncs.com:3306)/messageCentertest?charset=utf8mb4&parseTime=true&loc=Local
+Mysql: jianyu:topnet@123@tcp(am-2ze6crwd6bb0283jn167320o.ads.aliyuncs.com:3366)/messageCentertest?timeout=10s&interpolateParams=true
+DataSource:
+    DbName: messageCentertest
+    Address: am-2ze6crwd6bb0283jn167320o.ads.aliyuncs.com
+    UserName: jianyu
+    PassWord: topnet@123
+    MaxOpenConns: 5
+    MaxIdleConns: 5
 FileSystemConf:
   Etcd:
     Hosts:
-      - 127.0.0.1:2379
+      - 192.168.3.240:2379
     Key: message.rpc

BIN
rpc/go_build_message_RPC_go_linux


+ 15 - 3
rpc/internal/config/config.go

@@ -1,10 +1,22 @@
 package config
 
-import "github.com/tal-tech/go-zero/zrpc"
+import (
+	"github.com/tal-tech/go-zero/zrpc"
+)
 
 type Config struct {
 	zrpc.RpcServerConf
-	DataSource     string // 手动代码
+	DataSource *mysqlConfig // 手动代码
+	Mysql      string
+}
 
+type mysqlConfig struct {
+	DbName       string
+	Address      string
+	UserName     string
+	PassWord     string
+	MaxOpenConns int
+	MaxIdleConns int
 }
-var  ConfigJson Config
+
+var ConfigJson Config

+ 47 - 10
rpc/message.go

@@ -9,18 +9,20 @@ import (
 	"app.yhyue.com/moapp/MessageCenter/rpc/internal/server"
 	"app.yhyue.com/moapp/MessageCenter/rpc/internal/svc"
 	"app.yhyue.com/moapp/MessageCenter/rpc/message"
+	"app.yhyue.com/moapp/jybase/mysql"
 	"flag"
 	"fmt"
-	_ "github.com/go-sql-driver/mysql"
-	"github.com/go-xorm/xorm"
-	"log"
-
 	"github.com/tal-tech/go-zero/core/conf"
 	"github.com/tal-tech/go-zero/zrpc"
+	clientv3 "go.etcd.io/etcd/client/v3"
 	"google.golang.org/grpc"
+	"log"
+	"time"
 )
 
 var configFile = flag.String("f", "etc/message.yaml", "the config file")
+var EtcdCli *clientv3.Client
+var Mysql *mysql.Mysql
 
 func main() {
 	flag.Parse()
@@ -43,13 +45,48 @@ func main() {
 func init() {
 	conf.MustLoad(*configFile, &config.ConfigJson)
 	var err error
-	log.Println(config.ConfigJson.DataSource)
-	entity.Engine, err = xorm.NewEngine("mysql", config.ConfigJson.DataSource)
-	log.Println(err)
-	entity.Engine.ShowSQL(true)
+	//连接etcd
+	entity.EtcdCli, err = clientv3.New(clientv3.Config{
+		Endpoints:   config.ConfigJson.Etcd.Hosts,
+		DialTimeout: 5 * time.Second,
+	})
 	if err != nil {
-		log.Fatal("数据库连接失败:", err)
+		log.Printf("connect to etcd failed, err:%v\n", err)
+		return
 	}
-	fmt.Println(config.ConfigJson.DataSource + "链接成功")
+	log.Println("connect to etcd success")
+
+	log.Println("开始初始化数据库。。。。。")
+	//初始化mysql
+	entity.Mysql = &mysql.Mysql{
+		Address:      config.ConfigJson.DataSource.Address,
+		UserName:     config.ConfigJson.DataSource.UserName,
+		PassWord:     config.ConfigJson.DataSource.PassWord,
+		DBName:       config.ConfigJson.DataSource.DbName,
+		MaxOpenConns: config.ConfigJson.DataSource.MaxOpenConns,
+		MaxIdleConns: config.ConfigJson.DataSource.MaxIdleConns,
+	}
+	entity.Mysql.Init()
+
+	//连接mysql
+	//log.Println(config.ConfigJson.DataSource)
+	//entity.Engine, err = xorm.NewEngine("mysql", config.ConfigJson.DataSource)
+	//log.Println(err)
+	//entity.Engine.ShowSQL(true)
+	//if err != nil {
+	//	log.Fatal("数据库连接失败:", err)
+	//}
+	//fmt.Println(config.ConfigJson.DataSource + "链接成功")
+
 
+	//entity.Mysql, err = sql.Open("mysql", config.ConfigJson.Mysql)
+	//if err != nil {
+	//	panic(err.Error())
+	//}
+	//// 设置最大打开的连接数,默认值为0,表示不限制。
+	//entity.Mysql.SetMaxOpenConns(5)
+	//// 设置最大闲置的连接数
+	//entity.Mysql.SetMaxIdleConns(5)
+	//// 设置连接的最大生命周期,默认连接总是可重用。
+	//entity.Mysql.SetConnMaxLifetime(time.Hour)
 }

+ 15 - 15
rpc/messageclient/message.go

@@ -14,28 +14,28 @@ import (
 )
 
 type (
-	FindUserMsgRes               = message.FindUserMsgRes
-	GetMsgTypeRes                = message.GetMsgTypeRes
-	SendMsgRequest               = message.SendMsgRequest
 	DeleteSingleMessageRequest   = message.DeleteSingleMessageRequest
-	Response                     = message.Response
 	FindUserMsgReq               = message.FindUserMsgReq
-	MessageDetailReq             = message.MessageDetailReq
-	GetUnreadClassCountRes       = message.GetUnreadClassCountRes
-	GetUnreadClassCountReq       = message.GetUnreadClassCountReq
+	FindUserMsgRes               = message.FindUserMsgRes
+	GetMsgTypeReq                = message.GetMsgTypeReq
+	GetMsgTypeRes                = message.GetMsgTypeRes
 	UpdateMessageReadResp        = message.UpdateMessageReadResp
-	ChangeReadStatusRequest      = message.ChangeReadStatusRequest
-	DeleteMultipleMessageRequest = message.DeleteMultipleMessageRequest
+	Response                     = message.Response
 	GetClassUnreadCountReq       = message.GetClassUnreadCountReq
-	MessageDetailResp            = message.MessageDetailResp
-	GetLastMessageReq            = message.GetLastMessageReq
-	GetLastMessageRes            = message.GetLastMessageRes
+	SendMsgRequest               = message.SendMsgRequest
+	ChangeReadStatusRequest      = message.ChangeReadStatusRequest
 	ResCount                     = message.ResCount
-	GetUnreadCountRequest        = message.GetUnreadCountRequest
-	GetUnreadCountResponse       = message.GetUnreadCountResponse
 	Messages                     = message.Messages
-	GetMsgTypeReq                = message.GetMsgTypeReq
+	GetLastMessageRes            = message.GetLastMessageRes
+	GetUnreadClassCountRes       = message.GetUnreadClassCountRes
 	UpdateMessageReadReq         = message.UpdateMessageReadReq
+	DeleteMultipleMessageRequest = message.DeleteMultipleMessageRequest
+	GetUnreadCountRequest        = message.GetUnreadCountRequest
+	GetUnreadCountResponse       = message.GetUnreadCountResponse
+	MessageDetailReq             = message.MessageDetailReq
+	MessageDetailResp            = message.MessageDetailResp
+	GetLastMessageReq            = message.GetLastMessageReq
+	GetUnreadClassCountReq       = message.GetUnreadClassCountReq
 
 	Message interface {
 		//  修改消息阅读状态

+ 2 - 2
rpc/test/message_test.go

@@ -14,7 +14,7 @@ func  Test_ChangeReadStatus(t *testing.T)  {
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 	std := messageclient.NewMessage(zrpc.MustNewClient(zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"127.0.0.1:2379"}, Key: "message.rpc"}}))
 	//std := stdlibclient.NewStdlib(zrpc.MustNewClient(zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"127.0.0.1:2379"}, Key: "jydocs.stdlib.rpc"}}))
-	req := &messageclient.ChangeReadStatusRequest{Id: 1,ReadStatus: 1,Appid: "10000"}
+	req := &messageclient.ChangeReadStatusRequest{Id: "1",ReadStatus: 1,Appid: "10000"}
 	res, err := std.ChangeReadStatus(ctx, req)
 	log.Println("err ", err,res)
 	//if res.State == true {
@@ -27,7 +27,7 @@ func  Test_DeleteSingleMessage(t *testing.T)  {
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
 	std := messageclient.NewMessage(zrpc.MustNewClient(zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"127.0.0.1:2379"}, Key: "message.rpc"}}))
 	//std := stdlibclient.NewStdlib(zrpc.MustNewClient(zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"127.0.0.1:2379"}, Key: "jydocs.stdlib.rpc"}}))
-	req := &messageclient.DeleteSingleMessageRequest{Id: 3,Appid: "10000"}
+	req := &messageclient.DeleteSingleMessageRequest{Id: "3",Appid: "10000"}
 	res, err := std.DeleteSingleMessage(ctx, req)
 	log.Println("err ", err,res)
 	//if res.State == true {

+ 1 - 1
rpc/test/send_test.go

@@ -13,7 +13,7 @@ import (
 
 func Test_SendMsg(t *testing.T) {
 	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
-	std := messageclient.NewMessage(zrpc.MustNewClient(zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"127.0.0.1:2379"}, Key: "message.rpc"}}))
+	std := messageclient.NewMessage(zrpc.MustNewClient(zrpc.RpcClientConf{Etcd: discov.EtcdConf{Hosts: []string{"192.168.3.240:2379"}, Key: "message.rpc"}}))
 	req := &messageclient.SendMsgRequest{
 		Appid:         "10000",
 		ReceiveUserId: "6042120adca8410f1ef2ec84",

+ 48 - 62
service/messageService.go

@@ -3,7 +3,10 @@ package service
 import (
 	"app.yhyue.com/moapp/MessageCenter/entity"
 	"app.yhyue.com/moapp/MessageCenter/rpc/message"
+	qutil "app.yhyue.com/moapp/jybase/common"
+	"errors"
 	"log"
+	"strconv"
 )
 
 type MessageService struct {
@@ -11,24 +14,15 @@ type MessageService struct {
 
 // 修改消息阅读状态
 func (service *MessageService) ChangeReadStatus(data *message.ChangeReadStatusRequest) (int64, string) {
-	orm := entity.Engine.NewSession()
-	defer orm.Close()
-	m := entity.Message{}
-	count, err1 := orm.Where("id=? and isdel=1 and appid=?", data.Id, data.Appid).Count(m)
-	if count == 0 || err1 != nil {
+	msg := entity.Mysql.FindOne("message", map[string]interface{}{"id": data.Id, "isdel": 1, "appid": data.Appid}, "", "")
+	if msg == nil {
 		return 0, "该消息不存在"
-
 	}
-	m.IsRead = int(data.ReadStatus)
-	_, err := orm.Where("id=? and isdel=1", data.Id).Cols("isRead").Update(&m)
-	if err != nil {
-		orm.Rollback()
-		return 0, "修改消息阅读状态失败"
-	}
-	err2 := orm.Commit()
-	if err2 != nil {
+	b := entity.Mysql.Update("message", map[string]interface{}{"id": data.Id, "isdel": 1}, map[string]interface{}{"isRead": int(data.ReadStatus)})
+	if !b {
 		return 0, "修改消息阅读状态失败"
 	}
+	EtcdCountMinusOne(qutil.ObjToString((*msg)["receive_userid"]), strconv.Itoa(int((*msg)["msgType"].(float64))))
 	return 1, "修改消息阅读状态成功"
 }
 
@@ -53,52 +47,48 @@ func (service *MessageService) DeleteMessage(id []string, appId string) (int64,
 
 // 未读消息合计
 func (service *MessageService) CountUnread(userId string, appId string) (int64, string, int64) {
-	orm := entity.Engine.NewSession()
-	defer orm.Close()
-	m := entity.Message{}
-	count, err := orm.Where("receive_userid=? and isRead=0 and isdel=1 and appid=?", userId, appId).Count(m)
-	if err != nil {
-		log.Println(err)
-		return 0, "查询未读消息失败", 0
+	query := map[string]interface{}{
+		"receive_userid": userId,
+		"isRead":         0,
+		"isdel":          1,
+		"appid":          appId,
 	}
+	count := entity.Mysql.Count("message", query)
 	log.Println(count)
 	return 1, "查询未读消息成功", count
 }
 
 // 获取指定用户指定分类最新一条消息
 func (service *MessageService) LastMessage(userId string, appId string, msgType int64, isRead int64) (*message.Messages, error) {
-	orm := entity.Engine.NewSession()
-	defer orm.Close()
-	m1 := []*entity.Message{}
-	//m := []*message.Messages{}
-	sql := "receive_userid=? and isdel=1 and appid=?  and msg_type=?"
+	sql := "SELECT * FROM message receive_userid=? and isdel=1 and appid=?  and msg_type=?"
 	valueList := []interface{}{userId, appId, msgType}
 	if isRead != -1 {
 		sql = sql + " and isRead=?"
 		valueList = append(valueList, isRead)
 	}
-	err := orm.Select("*").Where(sql, valueList...).OrderBy("createtime desc").Limit(1, 0).Find(&m1)
-
-	if err != nil {
-		return nil, err
+	query := map[string]interface{}{
+		"receive_userid": userId,
+		"isdel":          1,
+		"appid":          appId,
+		"msg_type":       msgType,
 	}
+	lastMsg := entity.Mysql.FindOne("message", query, "", "createtime desc")
 
-	if len(m1) > 0 {
-		m := m1[0]
+	if lastMsg != nil && len(*lastMsg) > 0 {
 		msg := message.Messages{
-			Id:            m.Id,
-			Appid:         m.AppId,
-			ReceiveUserId: m.ReceiveUserid,
-			ReceiveName:   m.ReceiveName,
-			SendUserId:    m.SendUserid,
-			SendName:      m.SendName,
-			Createtime:    m.CreateTime.Format("2006-01-02 15:04:05"),
-			Title:         m.Title,
-			MsgType:       int64(m.MsgType),
-			Link:          m.Link,
-			CiteId:        int64(m.CiteId),
-			Content:       m.Content,
-			IsRead:        int64(m.IsRead),
+			Id:            qutil.ObjToString((*lastMsg)["id"]),
+			Appid:         qutil.ObjToString((*lastMsg)["appid"]),
+			ReceiveUserId: qutil.ObjToString((*lastMsg)["receive_userid"]),
+			ReceiveName:   qutil.ObjToString((*lastMsg)["receive_name"]),
+			SendUserId:    qutil.ObjToString((*lastMsg)["send_userid"]),
+			SendName:      qutil.ObjToString((*lastMsg)["send_name"]),
+			Createtime:    qutil.ObjToString((*lastMsg)["createtime"]),
+			Title:         qutil.ObjToString((*lastMsg)["title"]),
+			MsgType:       qutil.Int64All((*lastMsg)["msg_type"]),
+			Link:          qutil.ObjToString((*lastMsg)["link"]),
+			CiteId:        qutil.Int64All((*lastMsg)["cite_id"]),
+			Content:       qutil.ObjToString((*lastMsg)["content"]),
+			IsRead:        qutil.Int64All((*lastMsg)["isRead"]),
 		}
 		return &msg, nil
 	} else {
@@ -122,17 +112,12 @@ func FindMessageDetail(id string) (entity.Message, error) {
 
 // 获取用户未读消息分类及数量 及分类下的最新一条消息
 func (service *MessageService) ClassCountAndMessage(userId string, appId string) ([]*message.ResCount, error) {
-	orm := entity.Engine.NewSession()
-	defer orm.Close()
-	//m := []*message.Messages{}
-	query, err := orm.QueryInterface("SELECT msg_type,COUNT(CASE WHEN isRead=0 THEN 1 END) as count  FROM message where receive_userid=? and isdel=1 and appid=? GROUP BY msg_type  ORDER BY FIELD(`msg_type`,\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\")", userId, appId)
-	if err != nil {
-		return nil, err
-	}
+	query := entity.Mysql.SelectBySql("SELECT msg_type,COUNT(CASE WHEN isRead=0 THEN 1 END) as count  FROM message where receive_userid=? and isdel=1 and appid=? GROUP BY msg_type  ORDER BY FIELD(`msg_type`,\"1\",\"2\",\"3\",\"4\",\"5\",\"6\",\"7\")", userId, appId)
+
 	typeCount := []*message.ResCount{}
 	// 未读消息分类及数量
-	if query != nil && len(query) > 0 {
-		for _, v := range query {
+	if query != nil && len(*query) > 0 {
+		for _, v := range *query {
 			typeCount = append(typeCount, &message.ResCount{MsgType: v["msg_type"].(int64), Count: v["count"].(int64)})
 		}
 	}
@@ -157,14 +142,15 @@ func (service *MessageService) ReceiveMsgType(userId string, appId string) ([]in
 }
 
 func (service *MessageService) UpdateMessageReadStatus(msgType int, receiveUserid, appId string) (int, error) {
-	orm := entity.Engine.NewSession()
-	defer orm.Close()
-	msg := entity.Message{}
-	msg.IsRead = 1
-	c, err := orm.Where("receive_userid=? and msg_type = ? and appid = ?", receiveUserid, msgType, appId).Cols("isRead").Update(&msg)
-	//log.Println("更新后返回值:", c, err)
-	if c < 0 && err != nil {
-		return 0, err
+	query := map[string]interface{}{
+		"receive_userid": receiveUserid,
+		"msg_type":       msgType,
+		"appid":          appId,
+	}
+	b := entity.Mysql.Update("message", query, map[string]interface{}{"isRead": 1})
+	if !b {
+		return 0, errors.New("修改消息已读出错")
 	}
+	EtcdSetCountZero(receiveUserid, strconv.Itoa(msgType))
 	return 1, nil
 }

+ 196 - 38
service/sendMsg.go

@@ -1,8 +1,13 @@
 package service
 
 import (
+	"context"
+	"database/sql"
+	"encoding/json"
 	"fmt"
+	"go.etcd.io/etcd/client/v3/concurrency"
 	"log"
+	"strconv"
 	"time"
 
 	"app.yhyue.com/moapp/MessageCenter/entity"
@@ -11,48 +16,47 @@ import (
 
 // 类型的顺序
 const order = "1,4"
+const EtcdCount = "Count.%s.%s" //etcd 消息未读数量 Count.用户id.消息类型=数量
 
 func SendMsg(this message.SendMsgRequest) (int64, string) {
-	orm := entity.Engine.NewSession()
-	defer orm.Close()
-	err := orm.Begin()
-	fmt.Println(err)
-	count, _ := orm.Table("conversation").Where("receive_id = ? and send_id = ?", this.ReceiveUserId, this.SendUserId).Count()
+	//orm := entity.Engine.NewSession()
+	//defer orm.Close()
+	//err := orm.Begin()
+	//fmt.Println(err)
+	count := entity.Mysql.Count("conversation", map[string]interface{}{"receive_id": this.ReceiveUserId, "send_id": this.SendUserId})
 
+	sql3 := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel)
+		values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);`
+	sql3 = fmt.Sprintf(sql3, this.Appid, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"))
 	if count < 1 {
 		sql1 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) 
 		values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
 		sql1 = fmt.Sprintf(sql1, this.Appid, this.SendUserId, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, time.Now().Format("2006-01-02 15:04:05"))
-		//_, err = orm.Table("conversation").Insert(&conversation)
-		_, err = orm.Exec(sql1)
-		if err != nil {
-			log.Panicln("会话创建失败:", err)
-			orm.Rollback()
-			return 0, "会话创建失败"
-		}
-		sql2 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) 
+		ok := entity.Mysql.ExecTx("发送消息事务", func(tx *sql.Tx) bool {
+			//插入会话表
+			_, err := entity.Mysql.DB.Exec(sql1)
+
+			sql2 := `INSERT INTO conversation(appid,` + "`key`" + `,user_id,receive_id,receive_name,send_id,send_name,sort,createtime) 
 		values ('%s','','%s','%s','%s','%s','%s',0,'%s');`
-		sql2 = fmt.Sprintf(sql2, this.Appid, this.ReceiveUserId, this.SendUserId, this.SendName, this.ReceiveUserId, this.ReceiveName, time.Now().Format("2006-01-02 15:04:05"))
-		//_, err = orm.Table("conversation").Insert(&conversation)
-		_, err = orm.Exec(sql2)
-		if err != nil {
-			log.Panicln("会话创建失败:", err)
-			orm.Rollback()
-			return 0, "会话创建失败"
+			sql2 = fmt.Sprintf(sql2, this.Appid, this.ReceiveUserId, this.SendUserId, this.SendName, this.ReceiveUserId, this.ReceiveName, time.Now().Format("2006-01-02 15:04:05"))
+			_, err = entity.Mysql.DB.Exec(sql2)
+			//插入消息表
+			_, err = entity.Mysql.DB.Exec(sql3)
+			if err != nil {
+				return false
+			}
+			return true
+		})
+		if ok {
+			return 1, "消息发送成功"
 		}
 	}
-	sql := `INSERT INTO message(appid,receive_userid,receive_name,send_userid,send_name,title,content,msg_type,link,cite_id,createtime,isRead,isdel) 
-		values ("%s",'%s','%s','%s','%s','%s','%s','%d','%s',0,'%s',0,1);`
-	sql = fmt.Sprintf(sql, this.Appid, this.ReceiveUserId, this.ReceiveName, this.SendUserId, this.SendName, this.Title, this.Content, this.MsgType, this.Link, time.Now().Format("2006-01-02 15:04:05"))
-	//_, err = orm.Table("conversation").Insert(&conversation)
-	_, err = orm.Table("message").Exec(sql)
-	if err != nil {
-		log.Panicln("消息发送失败:", err)
-		orm.Rollback()
-		return 0, "消息发送失败"
+	_, err := entity.Mysql.DB.Exec(sql3)
+	if err == nil {
+		go EtcdCountAdd(this.ReceiveUserId, strconv.Itoa(int(this.MsgType)))
+		return 1, "消息发送成功"
 	}
-	orm.Commit()
-	return 1, "消息发送成功"
+	return 0, "消息发送失败"
 }
 
 func FindUserMsg(this message.FindUserMsgReq) message.FindUserMsgRes {
@@ -107,13 +111,167 @@ func FindUserMsg(this message.FindUserMsgReq) message.FindUserMsgRes {
 
 // 指定分类未读消息合计
 func ClassCountUnread(msgType int, userId string, appId string) (int64, string, int64) {
-	orm := entity.Engine
-	count, err := orm.Table("message").Where("msg_type=? and receive_userid=? and isdel=1 and appid=? and isRead=0", msgType, userId, appId).Count()
-	// data, err := orm.Sql("explain select count(*) from message where msg_type = ? and receive_userid=? and isdel=1 and appid=?", msgType, userId, appId).QueryInterface()
-	if err != nil {
-		log.Println(err)
-		return 0, "查询未读消息失败", 0
+	//orm := entity.Engine
+	//count, err := orm.Table("message").Where("msg_type=? and receive_userid=? and isdel=1 and appid=? and isRead=0", msgType, userId, appId).Count()
+	query := map[string]interface{}{
+		"msg_type":       msgType,
+		"receive_userid": userId,
+		"isdel":          1,
+		"appid":          appId,
+		"isRead":         0,
 	}
-	//log.Println(count)
+	count := entity.Mysql.Count("message", query)
 	return 1, "查询指定分类未读消息成功", count
 }
+
+// etcd数量加1
+func EtcdCountAdd(userId, msgType string) {
+	s1, err := concurrency.NewSession(entity.EtcdCli)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer s1.Close()
+	keyString := fmt.Sprintf(EtcdCount, userId, msgType)
+	m1 := concurrency.NewMutex(s1, keyString)
+	// 会话s1获取锁
+	if err := m1.Lock(context.TODO()); err != nil {
+		log.Fatal("获取锁失败", err)
+	}
+	// 操作数量
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	resp, err := s1.Client().Get(ctx, keyString)
+	cancel()
+	if err != nil {
+		fmt.Printf("get from etcd failed, err:%v\n", err)
+
+		// 释放锁
+		if err := m1.Unlock(context.TODO()); err != nil {
+			log.Fatal("释放锁失败", err)
+		}
+		return
+	}
+	var count int
+	for _, ev := range resp.Kvs {
+		if ev.Value != nil {
+			err := json.Unmarshal([]byte(ev.Value), &count)
+			if err != nil {
+				log.Println("etcd get err:", err)
+			} else {
+				break
+			}
+		}
+
+	}
+	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
+	_, err = s1.Client().Put(ctx, keyString, strconv.Itoa(count+1))
+	cancel()
+	if err != nil {
+		fmt.Printf("put to etcd failed, err:%v\n", err)
+		// 释放锁
+		if err := m1.Unlock(context.TODO()); err != nil {
+			log.Fatal("释放锁失败", err)
+		}
+		return
+	}
+
+	// 释放锁
+	if err := m1.Unlock(context.TODO()); err != nil {
+		log.Fatal("释放锁失败", err)
+	}
+}
+
+//单条消息,-1
+func EtcdCountMinusOne(userId, msgType string) {
+	s1, err := concurrency.NewSession(entity.EtcdCli)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer s1.Close()
+	keyString := fmt.Sprintf(EtcdCount, userId, msgType)
+	m1 := concurrency.NewMutex(s1, keyString)
+	// 会话s1获取锁
+	if err := m1.Lock(context.TODO()); err != nil {
+		log.Fatal("获取锁失败", err)
+	}
+	// 操作数量
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	resp, err := s1.Client().Get(ctx, keyString)
+	cancel()
+	if err != nil {
+		fmt.Printf("get from etcd failed, err:%v\n", err)
+
+		// 释放锁
+		if err := m1.Unlock(context.TODO()); err != nil {
+			log.Fatal("释放锁失败", err)
+		}
+		return
+	}
+	var count int
+	for _, ev := range resp.Kvs {
+		if ev.Value != nil {
+			err := json.Unmarshal([]byte(ev.Value), &count)
+			if err != nil {
+				log.Println("etcd get err:", err)
+			} else {
+				break
+			}
+		}
+
+	}
+	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
+	var fin string
+	if count > 0 {
+		fin = strconv.Itoa(count - 1)
+	} else {
+		fin = "0"
+	}
+	_, err = s1.Client().Put(ctx, keyString, fin)
+	cancel()
+	if err != nil {
+		fmt.Printf("put to etcd failed, err:%v\n", err)
+		// 释放锁
+		if err := m1.Unlock(context.TODO()); err != nil {
+			log.Fatal("释放锁失败", err)
+		}
+		return
+	}
+
+	// 释放锁
+	if err := m1.Unlock(context.TODO()); err != nil {
+		log.Fatal("释放锁失败", err)
+	}
+}
+
+// 消息类别置0
+func EtcdSetCountZero(userId, msgType string) {
+	log.Println(" 消息类别置0", userId, msgType)
+	s1, err := concurrency.NewSession(entity.EtcdCli)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer s1.Close()
+	keyString := fmt.Sprintf(EtcdCount, userId, msgType)
+	m1 := concurrency.NewMutex(s1, keyString)
+	// 会话s1获取锁
+	if err := m1.Lock(context.TODO()); err != nil {
+		log.Fatal("获取锁失败", err)
+	}
+	// 操作数量
+
+	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+	_, err = s1.Client().Put(ctx, keyString, "0")
+	cancel()
+	if err != nil {
+		fmt.Printf("put to etcd failed, err:%v\n", err)
+		// 释放锁
+		if err := m1.Unlock(context.TODO()); err != nil {
+			log.Fatal("释放锁失败", err)
+		}
+		return
+	}
+
+	// 释放锁
+	if err := m1.Unlock(context.TODO()); err != nil {
+		log.Fatal("释放锁失败", err)
+	}
+}