Tao Zhang 5 жил өмнө
parent
commit
c342d94e3b

+ 1 - 2
.gitignore

@@ -1,4 +1,3 @@
 ##
 .idea
-sma/agent
-server/server
+bin

+ 76 - 0
demo/client/main.go

@@ -0,0 +1,76 @@
+package main
+
+/**
+客户端调用
+*/
+import (
+	"app.yhyue.com/BP/servicerd/proto"
+	"context"
+	"flag"
+	"google.golang.org/grpc"
+	"log"
+	"sync"
+)
+
+//服务地址配置
+var (
+	rdserver = flag.String("rd", "127.0.0.1:10021", "服务治理地址")
+)
+var client proto.ServiceClient
+
+func init() {
+	flag.Parse()
+	conn, err := grpc.Dial(*rdserver, grpc.WithInsecure())
+	if err != nil {
+		return
+	}
+	client = proto.NewServiceClient(conn)
+}
+
+func run(thread int, wg *sync.WaitGroup) {
+	defer func(wg *sync.WaitGroup) {
+		wg.Done()
+	}(wg)
+	for i := 0; i < 200; i++ {
+		repl, err := client.Apply(context.TODO(), &proto.ApplyReqData{Name: "demo", Balance: 0})
+		if err != nil {
+			log.Println("出错了")
+			log.Fatalln(err.Error())
+		}
+		log.Println("结果", thread, repl.Addr, repl.ResourceId)
+		//TODO 业务调用
+		conn, err := grpc.Dial(repl.Addr, grpc.WithInsecure())
+		if err != nil {
+			return
+		}
+		defer conn.Close()
+		demo_client := proto.NewDemoServiceClient(conn)
+		demo_repl, err := demo_client.Say(context.TODO(), &proto.DemoReq{
+			Name: "张三",
+		})
+		if err != nil {
+			log.Println(err.Error())
+		} else {
+			log.Println("back::", thread, demo_repl.Data)
+		}
+		//只有使用SEQ负载模式,需要调用释放资源
+		/*
+			release_repl, err := client.Release(context.TODO(), &proto.StringReqData{Data: repl.ResourceId})
+			if err != nil {
+				log.Println("出错了")
+				log.Fatalln(err.Error())
+			} else {
+				log.Println(thread, release_repl.Data)
+			}*/
+	}
+}
+
+func main() {
+	wg := new(sync.WaitGroup)
+	for i := 0; i < 5; i++ {
+		wg.Add(1)
+		go run(i, wg)
+	}
+	wg.Wait()
+	log.Println("all ok")
+}

+ 129 - 0
demo/service/main.go

@@ -0,0 +1,129 @@
+/**
+服务提供者样例
+*/
+package main
+
+import (
+	"app.yhyue.com/BP/servicerd/proto"
+	"context"
+	"flag"
+	"fmt"
+	"google.golang.org/grpc"
+	"log"
+	"net"
+	"os"
+	"os/signal"
+	"time"
+)
+
+//服务地址配置
+var (
+	ip          = flag.String("ip", "192.168.20.100", "本机ip")
+	port        = flag.Int("port", 20153, "服务端口")
+	rdserver    = flag.String("rd", "127.0.0.1:10021", "服务治理地址")
+	serviceName = flag.String("name", "demo", "服务名称")
+)
+
+type DemoService struct {
+}
+
+//
+func (s *DemoService) Say(ctx context.Context, in *proto.DemoReq) (*proto.DemoRep, error) {
+	return &proto.DemoRep{Data: in.Name}, nil
+}
+
+//
+func heartbeat() {
+	conn, err := grpc.Dial(*rdserver, grpc.WithInsecure())
+	if err != nil {
+		return
+	}
+	defer conn.Close()
+	client := proto.NewHeartBeatClient(conn)
+	//调用服务端推送流
+	resp, _ := client.PutStream(context.Background())
+	tm := time.NewTicker(20 * time.Second)
+	for {
+		select {
+		case <-tm.C:
+			reqstreamData := &proto.StreamReqData{ServiceName: *serviceName, ServiceAddr: fmt.Sprintf("%s:%d", *ip, *port)}
+			_ = resp.Send(reqstreamData)
+		}
+	}
+}
+
+//服务注册
+func registe() {
+	conn, err := grpc.Dial(*rdserver, grpc.WithInsecure())
+	if err != nil {
+		return
+	}
+	defer conn.Close()
+	client := proto.NewServiceClient(conn)
+	ret, err := client.Registe(context.TODO(), &proto.ServiceMeta{
+		Name:    *serviceName,
+		Ip:      *ip,
+		Port:    int32(*port),
+		Balance: 0,
+		Workers: 5,
+	})
+	if err != nil {
+		log.Println(err.Error())
+	} else {
+		log.Println(ret)
+	}
+
+}
+
+//服务注销
+func destory() {
+	conn, err := grpc.Dial(*rdserver, grpc.WithInsecure())
+	if err != nil {
+		return
+	}
+	defer conn.Close()
+	client := proto.NewServiceClient(conn)
+	ret, err := client.Destory(context.TODO(), &proto.ServiceMeta{
+		Name:    *serviceName,
+		Ip:      *ip,
+		Port:    int32(*port),
+		Balance: 0,
+		Workers: 5,
+	})
+	if err != nil {
+		log.Println(err.Error())
+	} else {
+		log.Println(ret)
+	}
+}
+
+//服务启动
+func startServer() {
+	//监听端口
+	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
+	if err != nil {
+		log.Fatalln(err.Error())
+		return
+	}
+	//创建一个grpc 服务器
+	s := grpc.NewServer()
+	//注册事件
+	proto.RegisterDemoServiceServer(s, &DemoService{})
+	//处理链接
+	_ = s.Serve(lis)
+}
+
+func init() {
+	flag.Parse()
+}
+
+func main() {
+	go startServer()
+	go heartbeat() //心跳
+	registe()
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill)
+	<-c
+	log.Println("服务注销")
+	destory()
+}

+ 210 - 0
proto/demo.pb.go

@@ -0,0 +1,210 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: demo.proto
+
+//声明 包名
+
+package proto
+
+import (
+	context "context"
+	fmt "fmt"
+	proto "github.com/golang/protobuf/proto"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+	math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+//服务参数
+type DemoReq struct {
+	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *DemoReq) Reset()         { *m = DemoReq{} }
+func (m *DemoReq) String() string { return proto.CompactTextString(m) }
+func (*DemoReq) ProtoMessage()    {}
+func (*DemoReq) Descriptor() ([]byte, []int) {
+	return fileDescriptor_ca53982754088a9d, []int{0}
+}
+
+func (m *DemoReq) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_DemoReq.Unmarshal(m, b)
+}
+func (m *DemoReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_DemoReq.Marshal(b, m, deterministic)
+}
+func (m *DemoReq) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_DemoReq.Merge(m, src)
+}
+func (m *DemoReq) XXX_Size() int {
+	return xxx_messageInfo_DemoReq.Size(m)
+}
+func (m *DemoReq) XXX_DiscardUnknown() {
+	xxx_messageInfo_DemoReq.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_DemoReq proto.InternalMessageInfo
+
+func (m *DemoReq) GetName() string {
+	if m != nil {
+		return m.Name
+	}
+	return ""
+}
+
+//标准字符串返回结果
+type DemoRep struct {
+	Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *DemoRep) Reset()         { *m = DemoRep{} }
+func (m *DemoRep) String() string { return proto.CompactTextString(m) }
+func (*DemoRep) ProtoMessage()    {}
+func (*DemoRep) Descriptor() ([]byte, []int) {
+	return fileDescriptor_ca53982754088a9d, []int{1}
+}
+
+func (m *DemoRep) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_DemoRep.Unmarshal(m, b)
+}
+func (m *DemoRep) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_DemoRep.Marshal(b, m, deterministic)
+}
+func (m *DemoRep) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_DemoRep.Merge(m, src)
+}
+func (m *DemoRep) XXX_Size() int {
+	return xxx_messageInfo_DemoRep.Size(m)
+}
+func (m *DemoRep) XXX_DiscardUnknown() {
+	xxx_messageInfo_DemoRep.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_DemoRep proto.InternalMessageInfo
+
+func (m *DemoRep) GetData() string {
+	if m != nil {
+		return m.Data
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*DemoReq)(nil), "proto.DemoReq")
+	proto.RegisterType((*DemoRep)(nil), "proto.DemoRep")
+}
+
+func init() {
+	proto.RegisterFile("demo.proto", fileDescriptor_ca53982754088a9d)
+}
+
+var fileDescriptor_ca53982754088a9d = []byte{
+	// 118 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4a, 0x49, 0xcd, 0xcd,
+	0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a, 0xb2, 0x5c, 0xec, 0x2e, 0xa9,
+	0xb9, 0xf9, 0x41, 0xa9, 0x85, 0x42, 0x42, 0x5c, 0x2c, 0x79, 0x89, 0xb9, 0xa9, 0x12, 0x8c, 0x0a,
+	0x8c, 0x1a, 0x9c, 0x41, 0x60, 0x36, 0x42, 0xba, 0x00, 0x24, 0x9d, 0x92, 0x58, 0x92, 0x08, 0x93,
+	0x06, 0xb1, 0x8d, 0xcc, 0xb8, 0xb8, 0x41, 0xd2, 0xc1, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0x42,
+	0xea, 0x5c, 0xcc, 0xc1, 0x89, 0x95, 0x42, 0x7c, 0x10, 0x2b, 0xf4, 0xa0, 0x06, 0x4b, 0xa1, 0xf2,
+	0x0b, 0x94, 0x18, 0x92, 0xd8, 0xc0, 0x02, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xdf, 0xd5,
+	0xa7, 0x42, 0x91, 0x00, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConnInterface
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// DemoServiceClient is the client API for DemoService service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type DemoServiceClient interface {
+	//rpc
+	Say(ctx context.Context, in *DemoReq, opts ...grpc.CallOption) (*DemoRep, error)
+}
+
+type demoServiceClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewDemoServiceClient(cc grpc.ClientConnInterface) DemoServiceClient {
+	return &demoServiceClient{cc}
+}
+
+func (c *demoServiceClient) Say(ctx context.Context, in *DemoReq, opts ...grpc.CallOption) (*DemoRep, error) {
+	out := new(DemoRep)
+	err := c.cc.Invoke(ctx, "/proto.DemoService/Say", in, out, opts...)
+	if err != nil {
+		return nil, err
+	}
+	return out, nil
+}
+
+// DemoServiceServer is the server API for DemoService service.
+type DemoServiceServer interface {
+	//rpc
+	Say(context.Context, *DemoReq) (*DemoRep, error)
+}
+
+// UnimplementedDemoServiceServer can be embedded to have forward compatible implementations.
+type UnimplementedDemoServiceServer struct {
+}
+
+func (*UnimplementedDemoServiceServer) Say(ctx context.Context, req *DemoReq) (*DemoRep, error) {
+	return nil, status.Errorf(codes.Unimplemented, "method Say not implemented")
+}
+
+func RegisterDemoServiceServer(s *grpc.Server, srv DemoServiceServer) {
+	s.RegisterService(&_DemoService_serviceDesc, srv)
+}
+
+func _DemoService_Say_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
+	in := new(DemoReq)
+	if err := dec(in); err != nil {
+		return nil, err
+	}
+	if interceptor == nil {
+		return srv.(DemoServiceServer).Say(ctx, in)
+	}
+	info := &grpc.UnaryServerInfo{
+		Server:     srv,
+		FullMethod: "/proto.DemoService/Say",
+	}
+	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
+		return srv.(DemoServiceServer).Say(ctx, req.(*DemoReq))
+	}
+	return interceptor(ctx, in, info, handler)
+}
+
+var _DemoService_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "proto.DemoService",
+	HandlerType: (*DemoServiceServer)(nil),
+	Methods: []grpc.MethodDesc{
+		{
+			MethodName: "Say",
+			Handler:    _DemoService_Say_Handler,
+		},
+	},
+	Streams:  []grpc.StreamDesc{},
+	Metadata: "demo.proto",
+}

+ 11 - 11
proto/heartbeat.pb.go

@@ -29,7 +29,7 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
 //stream请求结构
 type StreamReqData struct {
 	ServiceName          string   `protobuf:"bytes,1,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
-	ServiceId            string   `protobuf:"bytes,2,opt,name=serviceId,proto3" json:"serviceId,omitempty"`
+	ServiceAddr          string   `protobuf:"bytes,2,opt,name=serviceAddr,proto3" json:"serviceAddr,omitempty"`
 	XXX_NoUnkeyedLiteral struct{} `json:"-"`
 	XXX_unrecognized     []byte   `json:"-"`
 	XXX_sizecache        int32    `json:"-"`
@@ -67,9 +67,9 @@ func (m *StreamReqData) GetServiceName() string {
 	return ""
 }
 
-func (m *StreamReqData) GetServiceId() string {
+func (m *StreamReqData) GetServiceAddr() string {
 	if m != nil {
-		return m.ServiceId
+		return m.ServiceAddr
 	}
 	return ""
 }
@@ -124,17 +124,17 @@ func init() {
 }
 
 var fileDescriptor_3c667767fb9826a9 = []byte{
-	// 159 bytes of a gzipped FileDescriptorProto
+	// 158 bytes of a gzipped FileDescriptorProto
 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0x4d, 0x2c,
 	0x2a, 0x49, 0x4a, 0x4d, 0x2c, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a,
-	0xfe, 0x5c, 0xbc, 0xc1, 0x25, 0x45, 0xa9, 0x89, 0xb9, 0x41, 0xa9, 0x85, 0x2e, 0x89, 0x25, 0x89,
+	0xc1, 0x5c, 0xbc, 0xc1, 0x25, 0x45, 0xa9, 0x89, 0xb9, 0x41, 0xa9, 0x85, 0x2e, 0x89, 0x25, 0x89,
 	0x42, 0x0a, 0x5c, 0xdc, 0xc5, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0x7e, 0x89, 0xb9, 0xa9, 0x12,
-	0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0xc8, 0x42, 0x42, 0x32, 0x5c, 0x9c, 0x50, 0xae, 0x67, 0x8a,
-	0x04, 0x13, 0x58, 0x1e, 0x21, 0xa0, 0xa4, 0x8c, 0x30, 0xb0, 0x18, 0x6c, 0xa0, 0x10, 0x17, 0x4b,
-	0x4a, 0x62, 0x49, 0x22, 0xd4, 0x24, 0x30, 0xdb, 0xc8, 0x83, 0x8b, 0xd3, 0x03, 0xe4, 0x1e, 0xa7,
-	0xd4, 0xc4, 0x12, 0x21, 0x6b, 0x2e, 0xce, 0x80, 0xd2, 0x12, 0x88, 0x26, 0x21, 0x11, 0x88, 0xf3,
-	0xf4, 0x50, 0x1c, 0x25, 0x85, 0x2e, 0x0a, 0x36, 0x59, 0x89, 0x41, 0x83, 0x31, 0x89, 0x0d, 0x2c,
-	0x61, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0xeb, 0x84, 0xe3, 0x9e, 0xe0, 0x00, 0x00, 0x00,
+	0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0xc8, 0x42, 0x48, 0x2a, 0x1c, 0x53, 0x52, 0x8a, 0x24, 0x98,
+	0x50, 0x54, 0x80, 0x84, 0x94, 0x94, 0x11, 0x86, 0x16, 0x83, 0x0d, 0x15, 0xe2, 0x62, 0x49, 0x49,
+	0x2c, 0x49, 0x84, 0x9a, 0x06, 0x66, 0x1b, 0x79, 0x70, 0x71, 0x7a, 0x80, 0xdc, 0xe4, 0x94, 0x9a,
+	0x58, 0x22, 0x64, 0xcd, 0xc5, 0x19, 0x50, 0x5a, 0x02, 0xd1, 0x24, 0x24, 0x02, 0x71, 0xa2, 0x1e,
+	0x8a, 0xc3, 0xa4, 0xd0, 0x45, 0xc1, 0x26, 0x2b, 0x31, 0x68, 0x30, 0x26, 0xb1, 0x81, 0x25, 0x8c,
+	0x01, 0x01, 0x00, 0x00, 0xff, 0xff, 0x66, 0x98, 0x7d, 0x2f, 0xe4, 0x00, 0x00, 0x00,
 }
 
 // Reference imports to suppress errors if they are not otherwise used.

+ 68 - 27
proto/service.pb.go

@@ -138,6 +138,46 @@ func (m *StringRepData) GetData() string {
 	return ""
 }
 
+//标准字符串请求
+type StringReqData struct {
+	Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *StringReqData) Reset()         { *m = StringReqData{} }
+func (m *StringReqData) String() string { return proto.CompactTextString(m) }
+func (*StringReqData) ProtoMessage()    {}
+func (*StringReqData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_a0b84a42fa06f626, []int{2}
+}
+
+func (m *StringReqData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_StringReqData.Unmarshal(m, b)
+}
+func (m *StringReqData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_StringReqData.Marshal(b, m, deterministic)
+}
+func (m *StringReqData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_StringReqData.Merge(m, src)
+}
+func (m *StringReqData) XXX_Size() int {
+	return xxx_messageInfo_StringReqData.Size(m)
+}
+func (m *StringReqData) XXX_DiscardUnknown() {
+	xxx_messageInfo_StringReqData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StringReqData proto.InternalMessageInfo
+
+func (m *StringReqData) GetData() string {
+	if m != nil {
+		return m.Data
+	}
+	return ""
+}
+
 //
 type ApplyReqData struct {
 	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
@@ -151,7 +191,7 @@ func (m *ApplyReqData) Reset()         { *m = ApplyReqData{} }
 func (m *ApplyReqData) String() string { return proto.CompactTextString(m) }
 func (*ApplyReqData) ProtoMessage()    {}
 func (*ApplyReqData) Descriptor() ([]byte, []int) {
-	return fileDescriptor_a0b84a42fa06f626, []int{2}
+	return fileDescriptor_a0b84a42fa06f626, []int{3}
 }
 
 func (m *ApplyReqData) XXX_Unmarshal(b []byte) error {
@@ -198,7 +238,7 @@ func (m *ApplyRepData) Reset()         { *m = ApplyRepData{} }
 func (m *ApplyRepData) String() string { return proto.CompactTextString(m) }
 func (*ApplyRepData) ProtoMessage()    {}
 func (*ApplyRepData) Descriptor() ([]byte, []int) {
-	return fileDescriptor_a0b84a42fa06f626, []int{3}
+	return fileDescriptor_a0b84a42fa06f626, []int{4}
 }
 
 func (m *ApplyRepData) XXX_Unmarshal(b []byte) error {
@@ -236,6 +276,7 @@ func (m *ApplyRepData) GetResourceId() string {
 func init() {
 	proto.RegisterType((*ServiceMeta)(nil), "proto.ServiceMeta")
 	proto.RegisterType((*StringRepData)(nil), "proto.StringRepData")
+	proto.RegisterType((*StringReqData)(nil), "proto.StringReqData")
 	proto.RegisterType((*ApplyReqData)(nil), "proto.ApplyReqData")
 	proto.RegisterType((*ApplyRepData)(nil), "proto.ApplyRepData")
 }
@@ -245,25 +286,25 @@ func init() {
 }
 
 var fileDescriptor_a0b84a42fa06f626 = []byte{
-	// 278 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x50, 0x3f, 0x4f, 0xfb, 0x30,
-	0x10, 0xfd, 0x25, 0xbf, 0x86, 0x88, 0x83, 0x32, 0x1c, 0x0c, 0x56, 0x07, 0x54, 0x85, 0xa5, 0x53,
-	0x07, 0x2a, 0x98, 0x58, 0x40, 0x5d, 0x18, 0x58, 0xd2, 0x4f, 0xe0, 0x26, 0xa7, 0x2a, 0x22, 0xc4,
-	0xe6, 0x6c, 0x40, 0xf9, 0xbe, 0x7c, 0x10, 0x64, 0x27, 0xae, 0xcc, 0xbf, 0x81, 0xc9, 0xcf, 0xef,
-	0xde, 0xd3, 0xbb, 0x7b, 0x30, 0x35, 0xc4, 0xaf, 0x4d, 0x45, 0x4b, 0xcd, 0xca, 0x2a, 0xcc, 0xfc,
-	0x53, 0xf4, 0x70, 0xb4, 0x19, 0xf8, 0x07, 0xb2, 0x12, 0x11, 0x26, 0x9d, 0x7c, 0x22, 0x91, 0xcc,
-	0x93, 0xc5, 0x61, 0xe9, 0x31, 0x9e, 0x40, 0xda, 0x68, 0x91, 0x7a, 0x26, 0x6d, 0xb4, 0xd3, 0x68,
-	0xc5, 0x56, 0xfc, 0x9f, 0x27, 0x8b, 0xac, 0xf4, 0x18, 0x05, 0xe4, 0x6f, 0x8a, 0x1f, 0x89, 0x8d,
-	0x98, 0x78, 0x3a, 0x7c, 0xdd, 0x64, 0x2b, 0x5b, 0xd9, 0x55, 0x24, 0xb2, 0x61, 0x32, 0x7e, 0x8b,
-	0x0b, 0x98, 0x6e, 0x2c, 0x37, 0xdd, 0xae, 0x24, 0xbd, 0x96, 0x43, 0x78, 0x2d, 0xad, 0x0c, 0xe1,
-	0x0e, 0x17, 0x37, 0x70, 0x7c, 0xab, 0x75, 0xdb, 0x97, 0xf4, 0x1c, 0x34, 0xdf, 0x16, 0x8c, 0x22,
-	0xd2, 0xcf, 0x11, 0x77, 0x7b, 0xf7, 0x3e, 0x41, 0xd6, 0x35, 0x07, 0xb7, 0xc3, 0x78, 0x0e, 0xc0,
-	0x64, 0xd4, 0x0b, 0x57, 0x74, 0x5f, 0x8f, 0x67, 0x46, 0xcc, 0xe5, 0x7b, 0x02, 0xf9, 0x58, 0x11,
-	0x5e, 0x41, 0x5e, 0xd2, 0xae, 0x31, 0x96, 0x10, 0x87, 0x1e, 0x97, 0x51, 0x7b, 0xb3, 0xb3, 0xc0,
-	0xc5, 0x67, 0x15, 0xff, 0x9c, 0x6d, 0x4d, 0xc6, 0x2a, 0xee, 0xff, 0x64, 0x5b, 0x41, 0xe6, 0xb7,
-	0xc7, 0xd3, 0x51, 0x10, 0x37, 0x31, 0xfb, 0x42, 0x06, 0xd3, 0xb5, 0x5b, 0xb1, 0x25, 0x69, 0x08,
-	0x7f, 0x52, 0xfc, 0x16, 0xb6, 0x3d, 0xf0, 0xf4, 0xea, 0x23, 0x00, 0x00, 0xff, 0xff, 0xaa, 0x1d,
-	0x62, 0xee, 0x27, 0x02, 0x00, 0x00,
+	// 282 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x50, 0xb1, 0x4e, 0xc3, 0x30,
+	0x10, 0x25, 0xa1, 0xa1, 0xe2, 0xa0, 0x0c, 0x07, 0x83, 0xd5, 0x01, 0x55, 0x61, 0xe9, 0xd4, 0x81,
+	0x0a, 0xb1, 0xb0, 0x80, 0xba, 0x30, 0xb0, 0xa4, 0x5f, 0xe0, 0x26, 0xa7, 0x2a, 0x22, 0xc4, 0xe6,
+	0x6c, 0x40, 0xf9, 0x60, 0xfe, 0x03, 0xd9, 0x89, 0x2b, 0x53, 0xd4, 0x81, 0x29, 0xcf, 0xef, 0xde,
+	0xcb, 0xbd, 0x7b, 0x30, 0x31, 0xc4, 0x9f, 0x75, 0x49, 0x0b, 0xcd, 0xca, 0x2a, 0xcc, 0xfc, 0x27,
+	0xef, 0xe0, 0x6c, 0xdd, 0xf3, 0x2f, 0x64, 0x25, 0x22, 0x8c, 0x5a, 0xf9, 0x46, 0x22, 0x99, 0x25,
+	0xf3, 0xd3, 0xc2, 0x63, 0xbc, 0x80, 0xb4, 0xd6, 0x22, 0xf5, 0x4c, 0x5a, 0x6b, 0xa7, 0xd1, 0x8a,
+	0xad, 0x38, 0x9e, 0x25, 0xf3, 0xac, 0xf0, 0x18, 0x05, 0x8c, 0xbf, 0x14, 0xbf, 0x12, 0x1b, 0x31,
+	0xf2, 0x74, 0x78, 0xba, 0xc9, 0x46, 0x36, 0xb2, 0x2d, 0x49, 0x64, 0xfd, 0x64, 0x78, 0xe6, 0x37,
+	0x30, 0x59, 0x5b, 0xae, 0xdb, 0x6d, 0x41, 0x7a, 0x25, 0xfb, 0xe5, 0x95, 0xb4, 0x32, 0x2c, 0x77,
+	0x38, 0x16, 0xbd, 0x1f, 0x14, 0x3d, 0xc0, 0xf9, 0xa3, 0xd6, 0x4d, 0x17, 0x69, 0xfe, 0x5c, 0x11,
+	0xe5, 0x48, 0x7f, 0xe7, 0x78, 0xda, 0xb9, 0x77, 0x31, 0x64, 0x55, 0x71, 0x70, 0x3b, 0x8c, 0xd7,
+	0x00, 0x4c, 0x46, 0x7d, 0x70, 0x49, 0xcf, 0xd5, 0xd0, 0x45, 0xc4, 0xdc, 0x7e, 0x27, 0x30, 0x1e,
+	0x7a, 0xc4, 0x3b, 0x18, 0x17, 0xb4, 0xad, 0x8d, 0x25, 0xc4, 0xbe, 0xec, 0x45, 0x54, 0xf1, 0xf4,
+	0x2a, 0x70, 0xf1, 0xed, 0xf9, 0x91, 0xb3, 0xad, 0xc8, 0x58, 0xc5, 0xdd, 0xbf, 0x6c, 0x4b, 0xc8,
+	0x7c, 0x7a, 0xbc, 0x1c, 0x04, 0x71, 0x13, 0xd3, 0x3d, 0x32, 0x98, 0xee, 0x5d, 0xc4, 0x86, 0xa4,
+	0x21, 0xdc, 0xff, 0x6f, 0xef, 0x3b, 0xb0, 0x6d, 0x73, 0xe2, 0xe9, 0xe5, 0x4f, 0x00, 0x00, 0x00,
+	0xff, 0xff, 0x80, 0x2d, 0x4a, 0xbc, 0x4d, 0x02, 0x00, 0x00,
 }
 
 // Reference imports to suppress errors if they are not otherwise used.
@@ -285,7 +326,7 @@ type ServiceClient interface {
 	//申请服务
 	Apply(ctx context.Context, in *ApplyReqData, opts ...grpc.CallOption) (*ApplyRepData, error)
 	//释放服务
-	Release(ctx context.Context, in *ApplyRepData, opts ...grpc.CallOption) (*StringRepData, error)
+	Release(ctx context.Context, in *StringReqData, opts ...grpc.CallOption) (*StringRepData, error)
 }
 
 type serviceClient struct {
@@ -323,7 +364,7 @@ func (c *serviceClient) Apply(ctx context.Context, in *ApplyReqData, opts ...grp
 	return out, nil
 }
 
-func (c *serviceClient) Release(ctx context.Context, in *ApplyRepData, opts ...grpc.CallOption) (*StringRepData, error) {
+func (c *serviceClient) Release(ctx context.Context, in *StringReqData, opts ...grpc.CallOption) (*StringRepData, error) {
 	out := new(StringRepData)
 	err := c.cc.Invoke(ctx, "/proto.Service/Release", in, out, opts...)
 	if err != nil {
@@ -341,7 +382,7 @@ type ServiceServer interface {
 	//申请服务
 	Apply(context.Context, *ApplyReqData) (*ApplyRepData, error)
 	//释放服务
-	Release(context.Context, *ApplyRepData) (*StringRepData, error)
+	Release(context.Context, *StringReqData) (*StringRepData, error)
 }
 
 // UnimplementedServiceServer can be embedded to have forward compatible implementations.
@@ -357,7 +398,7 @@ func (*UnimplementedServiceServer) Destory(ctx context.Context, req *ServiceMeta
 func (*UnimplementedServiceServer) Apply(ctx context.Context, req *ApplyReqData) (*ApplyRepData, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method Apply not implemented")
 }
-func (*UnimplementedServiceServer) Release(ctx context.Context, req *ApplyRepData) (*StringRepData, error) {
+func (*UnimplementedServiceServer) Release(ctx context.Context, req *StringReqData) (*StringRepData, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method Release not implemented")
 }
 
@@ -420,7 +461,7 @@ func _Service_Apply_Handler(srv interface{}, ctx context.Context, dec func(inter
 }
 
 func _Service_Release_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
-	in := new(ApplyRepData)
+	in := new(StringReqData)
 	if err := dec(in); err != nil {
 		return nil, err
 	}
@@ -432,7 +473,7 @@ func _Service_Release_Handler(srv interface{}, ctx context.Context, dec func(int
 		FullMethod: "/proto.Service/Release",
 	}
 	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
-		return srv.(ServiceServer).Release(ctx, req.(*ApplyRepData))
+		return srv.(ServiceServer).Release(ctx, req.(*StringReqData))
 	}
 	return interceptor(ctx, in, info, handler)
 }

+ 18 - 0
proto_src/demo.proto

@@ -0,0 +1,18 @@
+syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc
+
+//声明 包名
+package proto;
+
+//服务管理
+service DemoService {
+  //rpc
+  rpc Say(DemoReq)returns(DemoRep){}
+}
+//服务参数
+message DemoReq{
+  string name = 1;
+}
+//标准字符串返回结果
+message DemoRep{
+  string data = 1;
+}

+ 1 - 1
proto_src/heartbeat.proto

@@ -17,7 +17,7 @@ service HeartBeat {
 //stream请求结构
 message StreamReqData {
   string serviceName = 1;//服务名称
-  string serviceId = 2;//ip+端口
+  string serviceAddr = 2;//ip:端口
 
 }
 //stream返回结构

+ 5 - 1
proto_src/service.proto

@@ -12,7 +12,7 @@ service Service {
   //申请服务
   rpc Apply(ApplyReqData)returns(ApplyRepData){}
   //释放服务
-  rpc Release(ApplyRepData)returns(StringRepData){}
+  rpc Release(StringReqData)returns(StringRepData){}
 }
 //服务参数
 message ServiceMeta{
@@ -26,6 +26,10 @@ message ServiceMeta{
 message StringRepData{
   string data = 1;
 }
+//标准字符串请求
+message StringReqData{
+  string data = 1;
+}
 //
 message ApplyReqData{
   string name = 1;

+ 2 - 2
server/heartbeatService.go

@@ -16,8 +16,8 @@ type Heartbeat struct {
 func (h *Heartbeat) PutStream(cliStr proto.HeartBeat_PutStreamServer) error {
 	for {
 		if tem, err := cliStr.Recv(); err == nil {
-			log.Printf("%s--%s \n", tem.ServiceName, tem.ServiceId)
-			UpdateServiceMetaTtl(tem.ServiceId)
+			log.Printf("心跳包-  %s--%s \n", tem.ServiceName, tem.ServiceAddr)
+			UpdateServiceMetaTtl(tem.ServiceAddr)
 		} else {
 			log.Println("break, err :", err)
 			break

+ 3 - 2
server/main.go

@@ -19,9 +19,10 @@ func init() {
 
 //
 func main() {
+	//初始化内存数据库
+	InitDb()
 	//失效服务检查
 	go ClearTimeoutService(*serviceTtl)
-	InitDb()
 	//监听端口
 	lis, err := net.Listen("tcp", *addr)
 	if err != nil {
@@ -39,4 +40,4 @@ func main() {
 	proto.RegisterServiceServer(s, &Service{})
 	//处理链接
 	_ = s.Serve(lis)
-}
+}

+ 1 - 1
server/serverLoadService.go

@@ -15,7 +15,7 @@ type ServerLoad struct {
 func (s *ServerLoad) PutStream(cliStr proto.ServerLoad_PutStreamServer) error {
 	for {
 		if tem, err := cliStr.Recv(); err == nil {
-			log.Printf("%s--%f \n", tem.Ip, tem.Load)
+			log.Printf("Server Load1 %s--%f \n", tem.Ip, tem.Load)
 			UpdateServerLoad(tem.Ip, tem.Load)
 		} else {
 			log.Println("break, err :", err)

+ 9 - 5
server/serviceService.go

@@ -3,6 +3,7 @@ package main
 import (
 	"app.yhyue.com/BP/servicerd/proto"
 	"context"
+	"log"
 )
 
 /**
@@ -29,17 +30,20 @@ func (s *Service) Apply(ctx context.Context, in *proto.ApplyReqData) (*proto.App
 	var err error
 	if in.Balance == LOAD {
 		addr, id, err = ApplyWithLoad(in.Name)
-	} else {
+	} else if in.Balance == SEQ {
 		addr, id, err = ApplyWithNotUse(in.Name)
+	} else {
+		addr, id, err = ApplyWithRandom(in.Name)
 	}
 	if err != nil {
-		return &proto.ApplyRepData{Addr: addr, ResourceId: id}, nil
-	} else {
+		log.Fatalln(err.Error())
 		return &proto.ApplyRepData{Addr: "", ResourceId: ""}, err
+	} else {
+		return &proto.ApplyRepData{Addr: addr, ResourceId: id}, nil
 	}
 }
 
-func (s *Service) Release(ctx context.Context, in *proto.ApplyRepData) (*proto.StringRepData, error) {
-	Release(in.ResourceId)
+func (s *Service) Release(ctx context.Context, in *proto.StringReqData) (*proto.StringRepData, error) {
+	Release(in.Data)
 	return &proto.StringRepData{Data: "ok"}, nil
 }

+ 58 - 20
server/serviceStore.go

@@ -9,12 +9,14 @@ import (
 	"fmt"
 	"github.com/hashicorp/go-memdb"
 	"log"
+	"math/rand"
 	"time"
 )
 
 const (
-	LOAD = iota //按服务器压力分配
-	SEQ         //顺序执行 ,根据服务支持的执行单元数
+	RANDOM = iota //随机
+	LOAD          //按服务器压力分配
+	SEQ           //顺序执行 ,根据服务支持的执行单元数
 )
 
 type ServiceMeta struct {
@@ -55,6 +57,11 @@ func InitDb() {
 						Unique:  false,
 						Indexer: &memdb.StringFieldIndex{Field: "Ip"},
 					},
+					"addr": &memdb.IndexSchema{
+						Name:    "addr",
+						Unique:  false,
+						Indexer: &memdb.StringFieldIndex{Field: "Addr"},
+					},
 					"name": &memdb.IndexSchema{
 						Name:    "name",
 						Unique:  false,
@@ -104,26 +111,35 @@ func AddServiceMeta(serviceName string, ip string, port int32, workers int32, ba
 			Id:          fmt.Sprintf("%s:%d_%d", ip, port, i+1),
 			Addr:        fmt.Sprintf("%s:%d", ip, port),
 		}
+		log.Println("data:::", *sm)
 		err := txn.Insert("servicemeta", sm)
 		if err != nil {
-			log.Println(err.Error())
+			log.Fatalln("insert service meta error", err.Error())
 		}
+		log.Println("insert service meta", sm.String())
 	}
 }
 
 //更新服务有效期
-func UpdateServiceMetaTtl(serviceId string) {
+func UpdateServiceMetaTtl(serviceAddr string) {
 	now := time.Now().Unix()
 	txn := mdb.Txn(true)
 	defer txn.Commit()
-	item, err := txn.First("servicemeta", "id", serviceId)
+	rs, err := txn.Get("servicemeta", "addr", serviceAddr)
 	if err != nil {
-		log.Println(err.Error())
+		log.Fatalln("update service ttl error", err.Error())
 		return
 	}
-	sm := item.(*ServiceMeta)
-	sm.LastTtlTime = now
-	_ = txn.Insert("servicemeta", sm)
+	if rs == nil {
+		log.Println("serviceAddr", serviceAddr, "找不到")
+		return
+	}
+	for obj := rs.Next(); obj != nil; obj = rs.Next() {
+		sm := obj.(*ServiceMeta)
+		sm.LastTtlTime = now
+		_ = txn.Insert("servicemeta", sm)
+		log.Println("update service meta ttl", sm.String())
+	}
 }
 
 //注销服务
@@ -132,17 +148,18 @@ func DestoryServiceMeta(ip string, port int32) {
 	defer txn.Commit()
 	_, err := txn.DeleteAll("servicemeta", "ip_port", ip, port)
 	if err != nil {
-		log.Println(err.Error())
+		log.Fatalln("destory service error", err.Error())
 	}
+	log.Println("remove service meta", ip, port)
 }
 
 //申请调用,找未使用的那个资源
 func ApplyWithNotUse(serviceName string) (string, string, error) {
 	txn := mdb.Txn(true)
-	defer txn.Changes()
+	defer txn.Commit()
 	item, err := txn.First("servicemeta", "name_used", serviceName, false)
 	if err != nil {
-		log.Println(err.Error())
+		log.Fatalln("applywithnotuse error ", err.Error())
 		return "", "", errors.New("没有可用服务")
 	}
 	sm := item.(*ServiceMeta)
@@ -158,11 +175,10 @@ func ApplyWithLoad(serviceName string) (string, string, error) {
 	defer txn.Abort()
 	rs, err := txn.Get("servicemeta", "name", serviceName)
 	if err != nil {
-		log.Println(err.Error())
 		return "", "", errors.New("没有可用服务")
 	}
 	var addr, id string
-	var load float64 = 0
+	var load float64 = 1000
 	for obj := rs.Next(); obj != nil; obj = rs.Next() {
 		meta := obj.(*ServiceMeta)
 		if meta.Load < load { //找负载最小的那个
@@ -172,13 +188,34 @@ func ApplyWithLoad(serviceName string) (string, string, error) {
 	return addr, id, nil
 }
 
+//申请调用,随机找一个服务器
+func ApplyWithRandom(serviceName string) (string, string, error) {
+	txn := mdb.Txn(false)
+	defer txn.Abort()
+	rs, err := txn.Get("servicemeta", "name", serviceName)
+	if err != nil {
+		return "", "", errors.New("没有可用服务")
+	}
+	addrs := map[string]bool{}
+	for obj := rs.Next(); obj != nil; obj = rs.Next() {
+		meta := obj.(*ServiceMeta)
+		addrs[meta.Addr] = true
+	}
+	addrArray := make([]string, 0, 0)
+	for k, _ := range addrs {
+		addrArray = append(addrArray, k)
+	}
+	index := rand.Intn(len(addrArray))
+	return addrArray[index], "", nil
+}
+
 //释放资源
 func Release(serviceId string) {
 	txn := mdb.Txn(true)
-	defer txn.Changes()
-	item, err := txn.First("servicemeta", "id", serviceId, false)
+	defer txn.Commit()
+	item, err := txn.First("servicemeta", "id", serviceId)
 	if err != nil {
-		log.Println(err.Error())
+		log.Fatalln("release error", err.Error())
 		return
 	}
 	sm := item.(*ServiceMeta)
@@ -190,10 +227,10 @@ func Release(serviceId string) {
 //更新所有服务负载
 func UpdateServerLoad(ip string, load float64) {
 	txn := mdb.Txn(true)
-	defer txn.Changes()
+	defer txn.Commit()
 	rs, err := txn.Get("servicemeta", "ip", ip)
 	if err != nil {
-		log.Println(err.Error())
+		log.Fatalln(err.Error())
 		return
 	}
 	for obj := rs.Next(); obj != nil; obj = rs.Next() {
@@ -209,7 +246,7 @@ func RemoveTimeoutService(timeout int64) {
 	defer txn.Commit()
 	rs, err := txn.Get("servicemeta", "id", "")
 	if err != nil {
-		log.Println(err.Error())
+		log.Fatalln(err.Error())
 		return
 	}
 	filter := memdb.NewFilterIterator(rs, timeoutFilterFactory(timeout))
@@ -218,6 +255,7 @@ func RemoveTimeoutService(timeout int64) {
 		meta := obj.(*ServiceMeta)
 		ids = append(ids, meta.Id)
 	}
+	log.Println("清理的服务", ids)
 	for _, id := range ids {
 		_, _ = txn.DeleteAll("servicemeta", "id", id)
 	}