Tao Zhang 6 tahun lalu
induk
melakukan
9dfeebcdec
12 mengubah file dengan 197 tambahan dan 173 penghapusan
  1. TEMPAT SAMPAH
      bin/calldemo
  2. TEMPAT SAMPAH
      bin/msgserver
  3. TEMPAT SAMPAH
      bin/service
  4. 9 6
      src/mfw/util/client.go
  5. 1 1
      src/mfw/util/cluster.go
  6. 29 0
      src/mfw/util/cluster_test.go
  7. 0 1
      src/mfw/util/event.go
  8. 2 3
      src/mfw/util/log_test.go
  9. 110 0
      src/服务端/clientmanage.go
  10. 4 162
      src/服务端/main.go
  11. 42 0
      src/服务端/servicemanage.go
  12. TEMPAT SAMPAH
      src/调用样例/调用样例

TEMPAT SAMPAH
bin/calldemo


TEMPAT SAMPAH
bin/msgserver


TEMPAT SAMPAH
bin/service


+ 9 - 6
src/mfw/util/client.go

@@ -30,6 +30,7 @@ type Client struct {
 	messageQueue         chan *Packet
 	writeQueue           chan RawData
 	resultlock           sync.Map //map[string]chan []byte
+	isclosed             bool
 }
 
 //客户端配置
@@ -122,6 +123,9 @@ func NewClient(conf *ClientConfig) (*Client, error) {
 
 //
 func (client *Client) WriteObj(to, msgid string, event, sendtotype int, obj interface{}) {
+	if client.isclosed {
+		return
+	}
 	client.writeQueue <- RawData(Enpacket(client.myid, to, msgid, event, sendtotype, obj))
 }
 
@@ -169,22 +173,18 @@ func (client *Client) read4socket() (*Packet, error) {
 	defer Catch()
 	buffer := make([]byte, 8)
 	if _, err := io.ReadFull(client.conn, buffer); err != nil {
-		Log.Debug.Println("读取包头失败", err)
 		return nil, errors.New("read packet error")
 	}
 	if bytes.Compare(buffer[:4], PACKET_HEAD) != 0 {
-		Log.Debug.Println("包头校验失败")
 		return nil, errors.New("read packet head error")
 	}
 	size := int32(binary.BigEndian.Uint32(buffer[4:]))
 	if size < PACKET_MIN_LEN || size > PACKET_MAX_LEN {
-		Log.Debug.Println("数据包长度越界[", size, "]")
 		return nil, errors.New("read packet error")
 	}
 	buf := make([]byte, size)
 	readlen, err := io.ReadFull(client.conn, buf)
 	if err != nil || int32(readlen) != size {
-		Log.Debug.Println("读取包主体数据失败", err)
 		return nil, errors.New("read packet error")
 	}
 	//CRC校验
@@ -248,11 +248,14 @@ func (client *Client) writeDump(p <-chan RawData) {
 func (client *Client) Close() {
 	//清理掉自己发布的服务
 	client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, []int{})
-	//断线
+	//client.WriteObj("", "", EVENT_BYE, SENDTO_TYPE_P2P, []byte{})
+	//阻塞通道写入,但不影响读取
+	client.isclosed = true
 	close(client.messageQueue)
 	close(client.writeQueue)
-	//
+	//继续处理未完事务
 	time.Sleep(1 * time.Second)
+	//客户主动断开
 	client.conn.Close()
 
 }

+ 1 - 1
src/mfw/util/cluster.go

@@ -106,7 +106,7 @@ func (c *Cluster) getClient(nodename string) (*Client, error) {
 
 //关闭
 func (c *Cluster) Close() {
-	Log.Info.Println("正在关闭连接")
+	//Log.Info.Println("正在关闭连接")
 	for _, v := range c.nodes {
 		if v.Client != nil {
 			v.Client.Close()

+ 29 - 0
src/mfw/util/cluster_test.go

@@ -0,0 +1,29 @@
+package util
+
+import (
+	"log"
+	"testing"
+)
+
+//
+func BenchmarkCall(b *testing.B) {
+	c := NewCluster4Client(&Node{
+		Name:        "测试消息服务器",
+		ServerAddr:  "127.0.0.1:7070",
+		UseTls:      true,
+		TlsCertFile: "../../../bin/demo.xiaoa7.com.crt",
+		TlsKeyFile:  "../../../bin/demo.xiaoa7.com.key",
+		Weight:      0.8,
+	}, //可以写多个
+	)
+
+	for i := 0; i < b.N; i++ {
+		_, err := c.CallWithResult("", 2000, "", 1)
+		if err != nil {
+			b.Fail()
+		} else {
+			log.Println("ok")
+		}
+	}
+	c.Close()
+}

+ 0 - 1
src/mfw/util/event.go

@@ -18,5 +18,4 @@ const (
 	SENDTO_TYPE_ALL          = 1 //发送给所有客户端
 	SENDTO_TYPE_ALL_RECIVER  = 2 //发送给所有指定服务接收者
 	SENDTO_TYPE_P2P          = 3 //发送给指定客户端
-	SENDTO_TYPE_P2P_BYNAME   = 4 //发送给指定客户,通过指定客户的名称
 )

+ 2 - 3
src/mfw/util/log_test.go

@@ -6,15 +6,14 @@ import (
 
 //
 func BenchmarkLog(b *testing.B) {
-	//mylog := NewLogger("../../../logs", LOG_LEVEL_DEBUG, 1024*1024*5)
-	Log.SetParam(LOG_LEVEL_ERROR)
+	Log.SetParam(LOG_LEVEL_ERROR, true, "./logs", "test")
 	for i := 0; i < b.N; i++ {
 		Log.Debug.Println("hello world")
 	}
 }
 
 func TestLog(b *testing.T) {
-	mylog := NewLogger("../../../logs", LOG_LEVEL_DEBUG, 1024*50)
+	mylog := NewLogger("../../../logs", LOG_LEVEL_DEBUG, true, "test", 1024*50)
 	for i := 0; i < 5; i++ {
 		mylog.Debug.Println("hello world")
 	}

+ 110 - 0
src/服务端/clientmanage.go

@@ -0,0 +1,110 @@
+/**客户端连接
+ */
+package main
+
+import (
+	"encoding/json"
+	"io"
+	"mfw/util"
+	"sync"
+	"time"
+)
+
+//
+type ClientItem struct {
+	conn            io.ReadWriteCloser
+	timestamp       int64
+	name            string //
+	canhandlerevent []int  //
+	id              string
+}
+
+var (
+	//所有的请求
+	allclient  map[string]*ClientItem = make(map[string]*ClientItem)
+	clientlock sync.RWMutex
+)
+
+//删除服务节点
+func removeClient(myid string) {
+	defer util.Catch()
+	//
+	if v, ok := allclient[myid]; ok {
+		clientlock.Lock()
+		//删除某个客户端提供的服务
+		if len(v.canhandlerevent) > 0 {
+			publishservice(myid, []int{})
+		}
+		v.conn.Close()
+		delete(allclient, myid)
+		clientlock.Unlock()
+	}
+}
+
+//加入网络
+func joinNetwork(msg *util.Packet) {
+	data := map[string]interface{}{}
+	err := json.Unmarshal(msg.Raw, &data)
+	if err != nil { //解析join指令失败
+		msg.Conn.Close()
+		return
+	}
+	//
+	uuid := data["myid"].(string)
+	if allclient[uuid] != nil {
+		util.Log.Debug.Println(uuid, "已经存在")
+		msg.Conn.Close()
+		return
+	}
+	//
+	clientname := data["name"].(string)
+	handers_o_arr := data["handler"].([]interface{})
+	handers := []int{}
+	for _, v := range handers_o_arr {
+		handers = append(handers, int(v.(float64)))
+	}
+	//
+	allclient[uuid] = &ClientItem{conn: msg.Conn,
+		timestamp: time.Now().Unix(),
+		name:      clientname,
+		id:        uuid,
+	}
+	//
+	publishservice(uuid, handers)
+}
+
+//心跳检测规则重新定义::
+//每次读写都会刷新心跳时间,
+//长时间未发生读写时发送心跳查询,
+//3倍检测时间后仍未响应的,自动剔除队列
+func gc() {
+	now := time.Now().Unix()
+	now_bs := util.Int2Byte(int32(now))
+	for k, v := range allclient {
+		if now-v.timestamp > gcinterval*3 {
+			v.conn.Close()
+			removeClient(k)
+			continue
+		} else if now-v.timestamp > gcinterval {
+			util.Log.Debug.Println("发送心跳请求包到", v.name)
+			_, err := v.conn.Write(util.Enpacket("", k, "", util.EVENT_REQUEST_HEARTBEAT,
+				util.SENDTO_TYPE_P2P, now_bs))
+			if err != nil { //发心跳包出错
+				v.conn.Close()
+				removeClient(k)
+			}
+		}
+
+	}
+	time.AfterFunc(time.Duration(gcinterval)*time.Second, gc)
+}
+
+//更新心跳
+func updateheartbeat(from string) {
+	clientlock.Lock()
+	defer clientlock.Unlock()
+	defer util.Catch()
+	if v, ok := allclient[from]; ok {
+		v.timestamp = time.Now().Unix()
+	}
+}

+ 4 - 162
src/服务端/main.go

@@ -2,164 +2,14 @@ package main
 
 import (
 	"encoding/json"
-	"errors"
 	"flag"
-	"io"
 	"math/rand"
 	"mfw/util"
-	"sync"
-	"time"
 )
 
 //
-type ClientItem struct {
-	conn            io.ReadWriteCloser
-	timestamp       int64
-	name            string //
-	canhandlerevent []int  //
-	id              string
-}
-
-//
-var lock sync.Mutex
 var gcinterval int64
 
-//所有的请求
-var allclient map[string]*ClientItem = make(map[string]*ClientItem)
-
-//服务与提供者对应表
-var allservice map[int][]string = make(map[int][]string)
-
-//心跳检测规则重新定义::
-//每次读写都会刷新心跳时间,
-//长时间未发生读写时发送心跳查询,
-//3倍检测时间后仍未响应的,自动剔除队列
-func gc() {
-	now := time.Now().Unix()
-	now_bs := util.Int2Byte(int32(now))
-	for k, v := range allclient {
-		if now-v.timestamp > gcinterval*3 {
-			v.conn.Close()
-			removeClient(k)
-			continue
-		} else if now-v.timestamp > gcinterval {
-			util.Log.Debug.Println("发送心跳请求包到", v.name)
-			_, err := v.conn.Write(util.Enpacket("", k, "", util.EVENT_REQUEST_HEARTBEAT,
-				util.SENDTO_TYPE_P2P, now_bs))
-			if err != nil { //发心跳包出错
-				v.conn.Close()
-				removeClient(k)
-			}
-		}
-
-	}
-	time.AfterFunc(time.Duration(gcinterval)*time.Second, gc)
-}
-
-//删除服务节点
-func removeClient(myid string) {
-	lock.Lock()
-	defer lock.Unlock()
-	defer util.Catch()
-	if v, ok := allclient[myid]; ok {
-		v.conn.Close()
-	}
-	delete(allclient, myid)
-	for k, v := range allservice {
-		for j, smid := range v {
-			if smid == myid {
-				allservice[k] = append(v[:j], v[j+1:]...)
-				break
-			}
-		}
-	}
-	util.Log.Debug.Println("删除节点", myid, "全部服务", allservice)
-}
-
-//通过名称查找客户端
-func findClientByName(name string) (*ClientItem, error) {
-	for _, c := range allclient {
-		if c.name == name {
-			return c, nil
-		}
-	}
-	return nil, errors.New("cant't find the client")
-}
-
-//发布服务
-func publishservice(myid string, s []int) {
-	if myid == "00000000" { //无效请求验证
-		return
-	}
-	lock.Lock()
-	defer lock.Unlock()
-	defer util.Catch()
-	if allclient[myid] != nil {
-		allclient[myid].canhandlerevent = s
-		//删除该ID以前发布的服务
-		for k, v := range allservice {
-			for j, smid := range v {
-				if smid == myid {
-					allservice[k] = append(v[:j], v[j+1:]...)
-					break
-				}
-			}
-		}
-		//追加新发布的服务
-		for _, v := range s {
-			if v == util.EVENT_RETURN { //回调的不再作为服务注册
-				continue
-			}
-			allservice[v] = append(allservice[v], myid)
-		}
-	}
-	util.Log.Debug.Println("全部服务", allservice)
-}
-
-//更新心跳
-func updateheartbeat(from string) {
-	lock.Lock()
-	defer lock.Unlock()
-	defer util.Catch()
-	if v, ok := allclient[from]; ok {
-		v.timestamp = time.Now().Unix()
-	}
-
-}
-
-//加入网络
-func join(msg *util.Packet) {
-	data := map[string]interface{}{}
-	err := json.Unmarshal(msg.Raw, &data)
-	if err != nil { //解析join指令失败
-		msg.Conn.Close()
-		return
-	}
-	//
-	uuid := data["myid"].(string)
-	if allclient[uuid] != nil {
-		util.Log.Debug.Println(uuid, "已经存在")
-		msg.Conn.Close()
-		return
-	}
-	//
-	clientname := data["name"].(string)
-	handers_o_arr := data["handler"].([]interface{})
-	handers := []int{}
-	for _, v := range handers_o_arr {
-		handers = append(handers, int(v.(float64)))
-	}
-	//
-	allclient[uuid] = &ClientItem{conn: msg.Conn,
-		timestamp: time.Now().Unix(),
-		name:      clientname,
-		id:        uuid,
-	}
-	//
-	publishservice(uuid, handers)
-	util.Log.Debug.Println(allservice)
-}
-
 //处理客户端发过来的消息
 func processmsg(msg *util.Packet) {
 	defer util.Catch()
@@ -171,7 +21,7 @@ func processmsg(msg *util.Packet) {
 	//TODO 只写需要特殊处理的时间,其他都走default
 	case util.EVENT_REQUEST_JOIN:
 		//TODO 加入指令,直接返回
-		join(msg)
+		joinNetwork(msg)
 	case util.EVENT_RETURN_HEARTBEAT: //心跳回应包处理
 	case util.EVENT_PUBLISH_MYSERVICES: //客户端发布了自己的服务,支持二次发布服务,用于暂停、更新服务能力
 		services := []int{}
@@ -198,13 +48,6 @@ func processmsg(msg *util.Packet) {
 					updateheartbeat(msg.To)
 				}
 			}
-		} else if sttype == util.SENDTO_TYPE_P2P_BYNAME { //通过名称点对点发消息
-			if v2, err2 := findClientByName(msg.To); err2 == nil {
-				_, err := v2.conn.Write(msg.Bytes())
-				if err == nil {
-					updateheartbeat(v2.id)
-				}
-			}
 		}
 		if v, ok := allservice[event]; ok {
 			switch sttype {
@@ -214,9 +57,7 @@ func processmsg(msg *util.Packet) {
 						break
 					}
 					service_machine_id := v[rand.Intn(len(v))]
-					lock.Lock()
 					client, ok := allclient[service_machine_id]
-					lock.Unlock()
 					if ok {
 						_, err := client.conn.Write(msg.Bytes())
 						if err == nil {
@@ -226,7 +67,6 @@ func processmsg(msg *util.Packet) {
 							removeClient(service_machine_id)
 						}
 					}
-
 				}
 			case util.SENDTO_TYPE_ALL_RECIVER:
 				if v, ok := allservice[event]; ok {
@@ -255,12 +95,14 @@ func main() {
 	interval := flag.Int64("gc", 10, "GC间隔时间")
 	usetls := flag.Bool("usetls", false, "是否使用tls")
 	tlsca := flag.String("ca", "", "ca文件")
+	log2std := flag.Bool("log2std", false, "日志是否输出到控制台")
+	logsizelimit := flag.Int64("loglimit", 50, "日志文件大(mByte)小限制,超过限制会自动切分日志文件")
 	tlscert := flag.String("cert", "", "cert文件")
 	tlskey := flag.String("key", "", "key文件")
 	flag.Parse()
 	gcinterval = *interval
 	//初始化日志
-	util.Log = util.NewLogger("./logs", util.LOG_LEVEL_INFO, true, "msgserver", 1024*1024*50)
+	util.Log = util.NewLogger("./logs", util.LOG_LEVEL_INFO, *log2std, "msgserver", *logsizelimit*1024*1024)
 	//心跳检测
 	go gc()
 	//启动服务

+ 42 - 0
src/服务端/servicemanage.go

@@ -0,0 +1,42 @@
+package main
+
+import (
+	"mfw/util"
+	"sync"
+)
+
+var (
+	servicelock sync.RWMutex
+	//服务与提供者对应表
+	allservice map[int][]string = make(map[int][]string)
+)
+
+//发布服务
+func publishservice(myid string, s []int) {
+	if myid == "00000000" { //无效请求验证
+		return
+	}
+	servicelock.Lock()
+	defer servicelock.Unlock()
+	defer util.Catch()
+	if allclient[myid] != nil {
+		allclient[myid].canhandlerevent = s
+		//删除该ID以前发布的服务
+		for k, v := range allservice {
+			for j, smid := range v {
+				if smid == myid {
+					allservice[k] = append(v[:j], v[j+1:]...)
+					break
+				}
+			}
+		}
+		//追加新发布的服务
+		for _, v := range s {
+			if v == util.EVENT_RETURN { //回调的不再作为服务注册
+				continue
+			}
+			allservice[v] = append(allservice[v], myid)
+		}
+	}
+	util.Log.Debug.Println("全部服务", allservice)
+}

TEMPAT SAMPAH
src/调用样例/调用样例