Tao Zhang 6 жил өмнө
parent
commit
fd128956d8

+ 2 - 0
.gitignore

@@ -18,3 +18,5 @@
 *.data
 */timetask.json
 core/src/timetask.json
+bin/logs/*
+

BIN
bin/calldemo


BIN
bin/msgserver


BIN
bin/service


+ 27 - 5
src/mfw/util/client.go

@@ -143,10 +143,12 @@ func (client *Client) Call(to, msgid string, event, sendtotype int, obj interfac
 
 //底层事件处理
 func (client *Client) baseEventHandle(msg *Packet) {
+	if msg == nil {
+		return
+	}
 	event := int(msg.Event)
 	switch event {
 	case EVENT_REQUEST_HEARTBEAT: //请求的心跳,回应心跳请求
-		Log.Info.Println("服务端请求心跳")
 		client.WriteObj("", "", EVENT_RETURN_HEARTBEAT, SENDTO_TYPE_P2P, msg.Raw)
 		client.lastCheckHeart = time.Now().Unix()
 	case EVENT_RETURN: //回调处理,一般用于同步操作
@@ -179,14 +181,21 @@ func (client *Client) read4socket() (*Packet, error) {
 		Log.Debug.Println("数据包长度越界[", size, "]")
 		return nil, errors.New("read packet error")
 	}
-	var buf []byte = make([]byte, size)
+	buf := make([]byte, size)
 	readlen, err := io.ReadFull(client.conn, buf)
 	if err != nil || int32(readlen) != size {
 		Log.Debug.Println("读取包主体数据失败", err)
 		return nil, errors.New("read packet error")
 	}
-	//
-	raw := append(Int2Byte(size), buf...)
+	//CRC校验
+	body := buf[:len(buf)-4]
+	o_sum := buf[len(buf)-4:]
+	crc.Write(body)
+	n_sum := Uint322Byte(crc.Sum32())
+	crc.Reset()
+	if bytes.Compare(o_sum, n_sum) != 0 {
+		return nil, errors.New("包文校验失败")
+	}
 	//解析包
 	packet := &Packet{Length: size,
 		From:       string(buf[:8]),
@@ -194,7 +203,7 @@ func (client *Client) read4socket() (*Packet, error) {
 		Msgid:      string(buf[16:24]),
 		Event:      Byte2Int(buf[24:28]),
 		SentToType: Byte2Int(buf[28:32]),
-		Raw:        raw,
+		Raw:        body[32:],
 	}
 	return packet, nil
 }
@@ -234,3 +243,16 @@ func (client *Client) writeDump(p <-chan RawData) {
 		}
 	}
 }
+
+//退出服务
+func (client *Client) Close() {
+	//清理掉自己发布的服务
+	client.WriteObj("", "", EVENT_PUBLISH_MYSERVICES, SENDTO_TYPE_P2P, []int{})
+	//断线
+	close(client.messageQueue)
+	close(client.writeQueue)
+	//
+	time.Sleep(1 * time.Second)
+	client.conn.Close()
+
+}

+ 138 - 0
src/mfw/util/cluster.go

@@ -0,0 +1,138 @@
+/**
+客户端集群封装,仅仅针对调用方适用;
+服务发布方,还是仅仅单个节点
+*/
+package util
+
+import (
+	"errors"
+	"math/rand"
+	"sort"
+)
+
+type (
+	//消息总线节点
+	Node struct {
+		Name        string  `json:"name"` //节点名称,调用时,按名称调用
+		ServerAddr  string  `json:"serveraddr"`
+		Weight      float64 `json:"weight"`
+		UseTls      bool    `json:"usetls"`
+		TlsCertFile string  `json:"cert"`
+		TlsKeyFile  string  `json:"key"`
+		Client      *Client
+	}
+	Nodes []*Node //用于排序
+
+	//集群
+	Cluster struct {
+		nodesweightsum float64 //
+		nodes          Nodes
+		mapping        map[string]*Node //映射
+	}
+)
+
+func (ns Nodes) Len() int {
+	return len(ns)
+}
+func (ns Nodes) Less(i, j int) bool {
+	return ns[i].Weight < ns[j].Weight
+}
+func (ns Nodes) Swap(i, j int) {
+	ns[i], ns[j] = ns[j], ns[i]
+}
+
+//
+func NewCluster4Client(nodes ...*Node) *Cluster {
+	nodearr := make(Nodes, 0, 0)
+	mapping := make(map[string]*Node)
+	var weightsum float64
+	for _, v := range nodes { //生成client
+		client, err := NewClient(&ClientConfig{
+			MsgServerAddr:   v.ServerAddr,
+			ClientName:      v.Name + "_client",
+			CanHandleEvents: []int{},
+			EventHandler:    clusterProcessEvent,
+			UseTls:          v.UseTls,
+			TlsCertFile:     v.TlsCertFile,
+			TlsKeyFile:      v.TlsKeyFile,
+		})
+		if err == nil {
+			weightsum = weightsum + v.Weight
+			v.Client = client
+			nodearr = append(nodearr, v)
+			mapping[v.Name] = v
+		} else {
+			Log.Error.Println(err.Error())
+		}
+	}
+	sort.Sort(nodearr)
+	return &Cluster{
+		nodes:          nodearr,
+		mapping:        mapping,
+		nodesweightsum: weightsum,
+	}
+}
+
+//异步事件处理,主要走同步,异步仅仅是个空实现
+func clusterProcessEvent(p *Packet) {
+
+}
+
+//取到client
+func (c *Cluster) getClient(nodename string) (*Client, error) {
+	if nodename != "" {
+		if v, ok := c.mapping[nodename]; ok {
+			return v.Client, nil
+		} else {
+			Log.Error.Println("没有", nodename, "消息总线")
+			return nil, errors.New("not fount " + nodename + " node")
+		}
+	} else { //走权重,随机取
+		if len(c.nodes) == 0 || c.nodesweightsum == 0 {
+			Log.Error.Println("没有足够节点", len(c.nodes), c.nodesweightsum)
+			return nil, errors.New("There are not enough nodes.")
+		}
+		for i := 0; i < 100; i++ {
+			tmp := rand.Float64() * c.nodesweightsum
+			for _, v := range c.nodes {
+				if v.Weight > tmp {
+					return v.Client, nil
+				}
+			}
+		}
+		return nil, errors.New("No client can find!")
+	}
+}
+
+//关闭
+func (c *Cluster) Close() {
+	Log.Info.Println("正在关闭连接")
+	for _, v := range c.nodes {
+		if v.Client != nil {
+			v.Client.Close()
+		}
+	}
+}
+
+/**
+@param node 为空时,以node的weight参考,
+做多节点负载均衡,前提条件:各个node都有请求的event服务。
+否则,需要指定node名称;
+*/
+func (c *Cluster) CallWithResult(node string, event int, param interface{}, timeout int64) ([]byte, error) {
+	client, err := c.getClient(node)
+	if err != nil {
+		return []byte{}, err
+	}
+	return client.Call("", UUID(8), event, SENDTO_TYPE_RAND_RECIVER, param, timeout)
+}
+
+//无需回应结果的调用
+func (c *Cluster) CallWithoutResult(node string, event int, param interface{}) error {
+	client, err := c.getClient(node)
+	if err != nil {
+		return err
+	}
+	client.WriteObj("", "", event, SENDTO_TYPE_RAND_RECIVER, param)
+	return nil
+}

+ 9 - 3
src/mfw/util/protocol.go

@@ -32,9 +32,15 @@ type Packet struct {
 	Event      int32  //4bit
 	SentToType int32  //4bit
 	//头结束
-	Raw  []byte             //数据区域
-	Sum  int32              //crc32校验和 (从长度标志到数据区域)
-	Conn io.ReadWriteCloser //
+	Raw         []byte             //数据区域
+	Sum         int32              //crc32校验和 (从长度标志到数据区域)
+	packetbytes []byte             //完整数据包
+	Conn        io.ReadWriteCloser //
+}
+
+//
+func (p *Packet) Bytes() []byte {
+	return p.packetbytes
 }
 
 //

+ 16 - 11
src/mfw/util/server.go

@@ -18,9 +18,9 @@ type ServerConfig struct {
 	ProcessEvent func(*Packet)
 	ListenAddr   string
 	UseTls       bool
-	TlsCAFile        string
-	TlsCertFile      string
-	TlsKeyFile       string
+	TlsCAFile    string
+	TlsCertFile  string
+	TlsKeyFile   string
 }
 
 //启动服务端
@@ -114,16 +114,21 @@ func (p *Reader) readFrame(size int32) (*Packet, error) {
 		if bytes.Compare(o_sum, n_sum) != 0 {
 			return nil, errors.New("包文校验失败")
 		}
+		var pbuf bytes.Buffer //拼原始包
+		pbuf.Write(PACKET_HEAD)
+		pbuf.Write(Int2Byte(size))
+		pbuf.Write(buf)
 		//解析包
 		packet := &Packet{
-			Length:     size,
-			From:       string(body[:8]),
-			To:         string(body[8:16]),
-			Msgid:      string(body[16:24]),
-			Event:      Byte2Int(body[24:28]),
-			SentToType: Byte2Int(body[28:32]),
-			Raw:        body[32:],
-			Conn:       p.conn,
+			Length:      size,
+			From:        string(body[:8]),
+			To:          string(body[8:16]),
+			Msgid:       string(body[16:24]),
+			Event:       Byte2Int(body[24:28]),
+			SentToType:  Byte2Int(body[28:32]),
+			Raw:         body[32:],
+			packetbytes: pbuf.Bytes(),
+			Conn:        p.conn,
 		}
 		return packet, nil
 	}

+ 1 - 4
src/发布服务样例/main.go

@@ -10,7 +10,6 @@ import (
 
 const (
 	SERVICE_SAYHEOLLO = 2000             //服务1
-	SERVICE_SAVEUSER  = 2001             //服务2
 	MSG_SERVER        = "127.0.0.1:7070" //消息服务器地址
 	GC_TIMEOUT        = 20               //客户端检查链接间隔X秒
 )
@@ -21,8 +20,6 @@ func processEvent(p *util.Packet) {
 	switch event {
 	case SERVICE_SAYHEOLLO: //同步调用,原数据返回
 		client.WriteObj(p.From, p.Msgid, util.EVENT_RETURN, util.SENDTO_TYPE_P2P, "ok")
-	case SERVICE_SAVEUSER: //异步
-		client.WriteObj(p.From, "", util.EVENT_RETURN, util.SENDTO_TYPE_P2P, "成功了啊")
 	}
 }
 
@@ -40,7 +37,7 @@ func main() {
 	client, err = util.NewClient(&util.ClientConfig{
 		MsgServerAddr:   *addr,
 		ClientName:      *name,
-		CanHandleEvents: []int{1, 2, 3},
+		CanHandleEvents: []int{2000},
 		EventHandler:    processEvent,
 		UseTls:          *usetls,
 		TlsCertFile:     *tlscert,

+ 11 - 34
src/服务端/main.go

@@ -4,7 +4,6 @@ import (
 	"encoding/json"
 	"errors"
 	"flag"
-	"fmt"
 	"io"
 	"math/rand"
 	"mfw/util"
@@ -128,23 +127,6 @@ func updateheartbeat(from string) {
 
 }
 
-//查看所有服务提供者
-func viewallservice(p *util.Packet) {
-	defer util.Catch()
-	ret := []interface{}{}
-	lock.Lock()
-	for k, v := range allclient {
-		ret = append(ret, map[string]interface{}{
-			"handle": fmt.Sprintf("%v", v.canhandlerevent),
-			"name":   v.name,
-			"myid":   k,
-		})
-	}
-	lock.Unlock()
-	bs, _ := json.Marshal(ret)
-	allclient[p.From].conn.Write(util.Enpacket("", p.From, p.Msgid, util.EVENT_RETURN, util.SENDTO_TYPE_P2P, bs))
-}
-
 //加入网络
 func join(msg *util.Packet) {
 	data := map[string]interface{}{}
@@ -195,35 +177,30 @@ func processmsg(msg *util.Packet) {
 		services := []int{}
 		json.Unmarshal(msg.Raw, &services)
 		publishservice(from, services)
-	case util.EVENT_VIEWALL_CLIENT: //
-		go viewallservice(msg)
-	case util.EVENT_REMOVE_CLIENT: //服务端强制删除节点
-		removeClient(string(msg.Raw))
 	case util.EVENT_BYE: //客户端主动要求断开
 		removeClient(from)
-	default: //处理业务事件
-		//识别发送类型
+	default: //处理业务事件,转发数据包
 		sttype := int(msg.SentToType)
 		if sttype == util.SENDTO_TYPE_ALL { //发送给所有节点
 			for service_machine_id, v := range allclient { //发所有,不支持的不处理
 				if service_machine_id == from { //广播不用发给自己
 					continue
 				}
-				_, err := v.conn.Write(msg.Raw)
+				_, err := v.conn.Write(msg.Bytes())
 				if err == nil {
 					updateheartbeat(service_machine_id)
 				}
 			}
 		} else if sttype == util.SENDTO_TYPE_P2P { //点对点发消息,不用注册服务
 			if v2, ok2 := allclient[msg.To]; ok2 {
-				_, err := v2.conn.Write(msg.Raw)
+				_, err := v2.conn.Write(msg.Bytes())
 				if err == nil {
 					updateheartbeat(msg.To)
 				}
 			}
 		} else if sttype == util.SENDTO_TYPE_P2P_BYNAME { //通过名称点对点发消息
 			if v2, err2 := findClientByName(msg.To); err2 == nil {
-				_, err := v2.conn.Write(msg.Raw)
+				_, err := v2.conn.Write(msg.Bytes())
 				if err == nil {
 					updateheartbeat(v2.id)
 				}
@@ -241,7 +218,7 @@ func processmsg(msg *util.Packet) {
 					client, ok := allclient[service_machine_id]
 					lock.Unlock()
 					if ok {
-						_, err := client.conn.Write(msg.Raw)
+						_, err := client.conn.Write(msg.Bytes())
 						if err == nil {
 							updateheartbeat(service_machine_id)
 							break
@@ -258,7 +235,7 @@ func processmsg(msg *util.Packet) {
 							continue
 						}
 						if v2, ok2 := allclient[service_machine_id]; ok2 {
-							_, err := v2.conn.Write(msg.Raw)
+							_, err := v2.conn.Write(msg.Bytes())
 							if err == nil {
 								updateheartbeat(service_machine_id)
 							}
@@ -288,11 +265,11 @@ func main() {
 	go gc()
 	//启动服务
 	util.StartServer(&util.ServerConfig{
-		ListenAddr: *addr,
-		UseTls:     *usetls,
-		TlsCAFile:      *tlsca,
-		TlsCertFile:    *tlscert,
-		TlsKeyFile:     *tlskey,
+		ListenAddr:  *addr,
+		UseTls:      *usetls,
+		TlsCAFile:   *tlsca,
+		TlsCertFile: *tlscert,
+		TlsKeyFile:  *tlskey,
 		ProcessEvent: func(data *util.Packet) {
 			//接受消息处理
 			processmsg(data)

+ 16 - 99
src/调用样例/main.go

@@ -1,111 +1,28 @@
 package main
 
 import (
-	_ "bytes"
-	"encoding/json"
-	"fmt"
 	"log"
-	"mfw/util"
-	"regexp"
-	"strings"
-	"time"
+	mu "mfw/util"
 )
 
-const (
-	MSG_SERVER = "test.qmx.top:7070" //消息服务器地址
-	GC_TIMEOUT = 100                 //客户端检查链接间隔X秒
-)
-
-//消息处理
-func processEvent(p *util.Packet) {
-	//TODO 异步调用的结果在这里处理
-	fmt.Println("异步调用的结果", string(p.GetBusinessData()))
-
-}
-
-var client *util.Client
-
-func main2() {
-	//	client, _ = util.StartClient(processEvent, MSG_SERVER, []int{util.EVENT_RECIVE_CALLBACK}, GC_TIMEOUT)
-	//	time.Sleep(5 * time.Second)
-	//	for i := 0; i < 2; i++ {
-	//		client.WriteObj("", "", 4001, 0, s1)
-	//		//time.Sleep(5 * time.Second)
-	//		client.WriteObj("", "", 4001, 0, s2)
-	//		//time.Sleep(5 * time.Second)
-	//	}
-	//	b := make(chan bool, 1)
-	//	<-b
-}
-
-func main3() {
-	client, _ = util.StartClient(processEvent, MSG_SERVER, "测试调用", []int{util.SERVICE_SPIDER_ECPS})
-	client.WriteObj("", "", util.SERVICE_SPIDER_ECPS, util.SENDTO_TYPE_ALL_RECIVER, map[string]string{"entName": "安彩液晶显示器件有限责任公司"})
-	fmt.Println("************")
-	time.Sleep(20 * time.Second)
-	//同步调用方法
-	//	for i := 0; i < 10; i++ {
-	//		//同步调用
-	//		msgid := util.UUID(8)
-	//		ret, err := client.Call("", msgid, 2000, util.SENDTO_TYPE_RAND_RECIVER, map[string]interface{}{
-	//			"param": "我是张三啊",
-	//			"msgid": msgid,
-	//		}, 2)
-	//		fmt.Println("jg:::", string(ret), err)
-	//	}
-	//	time.Sleep(5 * time.Second)
-
-	//异步调用,一直给服务器发垃圾,直到超时
-	//	for i := 0; i < 1000000; i++ {
-	//		sendbs := bytes.Repeat([]byte{0, 0}, i%1000)
-	//		msgid := util.UUID(8)
-	//		client.WriteObj("", msgid, 2004, util.SENDTO_TYPE_RAND_RECIVER, sendbs)
-	//		fmt.Print("+")
-	//		if i%500 == 0 {
-	//			time.Sleep(1 * time.Second)
-	//		}
-	//	}
-	//	time.Sleep(480 * time.Second)
-}
-
-func main4() {
-	client, _ = util.StartClient(processEvent, MSG_SERVER, "测试调用", []int{})
-	time.Sleep(5 * time.Minute)
-}
-
 func main() {
-	client, _ = util.StartClient(func(p *util.Packet) {
-		//异步调用
-	}, MSG_SERVER, "测试调用", []int{})
-	ret, _ := client.Call("", util.UUID(8), 4010, util.SENDTO_TYPE_RAND_RECIVER, "湖北烟草金叶复烤有限责任公司襄阳复烤厂补充建立招标代理机构库采购项目评标结果公示", 3)
-	var m [][]string
-	json.Unmarshal(ret, &m)
-	log.Println(m)
-	var reg = regexp.MustCompile("^[0-9a-zA-Z-.]+$")
-	arr := []string{}
-	keyword := []string{}
-	for _, tmp := range m {
-		if reg.MatchString(tmp[0]) {
-			arr = append(arr, tmp[0])
+	c := mu.NewCluster4Client(&mu.Node{
+		Name:        "测试消息服务器",
+		ServerAddr:  "127.0.0.1:7070",
+		UseTls:      true,
+		TlsCertFile: "./demo.xiaoa7.com.crt",
+		TlsKeyFile:  "./demo.xiaoa7.com.key",
+		Weight:      0.8,
+	}, //可以写多个
+	)
+	defer c.Close()
+	for i := 0; i < 50; i++ {
+		bs, err := c.CallWithResult("", 2000, "", 5)
+		if err != nil {
+			log.Println(err.Error())
 		} else {
-			if len(arr) > 0 {
-				str := strings.Join(arr, "")
-				keyword = append(keyword, str)
-				arr = []string{}
-			}
-			if len(tmp[0]) > 3 && (strings.HasPrefix(tmp[1], "n") || tmp[1] == "v" || tmp[1] == "vn" || strings.HasPrefix(tmp[1], "g")) {
-				keyword = append(keyword, tmp[0])
-			}
+			log.Println(string(bs))
 		}
 	}
-	log.Println(strings.Join(keyword, ","))
-	content := "湖北烟"
-	tc := []rune(content)
-	ltc := len(tc)
-	des := content
-	if ltc > 12 {
-		des = string(tc[:12])
-	}
-	log.Println(des)
 
 }

BIN
src/调用样例/调用样例