소스 검색

目录结构调整

Tao Zhang 5 년 전
부모
커밋
c179da8a93
13개의 변경된 파일460개의 추가작업 그리고 125개의 파일을 삭제
  1. 3 0
      go.mod
  2. 13 0
      go.sum
  3. 85 68
      proto/service.pb.go
  4. 13 11
      proto_src/service.proto
  5. 31 0
      server/db.go.bak
  6. 2 16
      server/heartbeatService.go
  7. 1 0
      server/main.go
  8. 1 0
      server/online.go.bak
  9. 1 1
      server/serverLoadService.go
  10. 0 29
      server/serviceManager.go
  11. 45 0
      server/serviceService.go
  12. 265 0
      server/serviceStore.go
  13. 0 0
      server/snycmap.go.bak

+ 3 - 0
go.mod

@@ -6,6 +6,9 @@ require (
 	github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
 	github.com/go-ole/go-ole v1.2.4 // indirect
 	github.com/golang/protobuf v1.4.2
+	github.com/google/btree v1.0.0 // indirect
+	github.com/hashicorp/go-memdb v1.2.1
+	github.com/nedscode/memdb v0.0.0-20190730235322-b1504ff22569
 	github.com/shirou/gopsutil v2.20.6+incompatible
 	google.golang.org/grpc v1.30.0
 )

+ 13 - 0
go.sum

@@ -22,11 +22,24 @@ github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:W
 github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
 github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
 github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo=
+github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
 github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
 github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/hashicorp/go-immutable-radix v1.2.0 h1:l6UW37iCXwZkZoAbEYnptSHVE/cQ5bOTPYG5W3vf9+8=
+github.com/hashicorp/go-immutable-radix v1.2.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60=
+github.com/hashicorp/go-memdb v1.2.1 h1:wI9btDjYUOJJHTCnRlAG/TkRyD/ij7meJMrLK9X31Cc=
+github.com/hashicorp/go-memdb v1.2.1/go.mod h1:OSvLJ662Jim8hMM+gWGyhktyWk2xPCnWMc7DWIqtkGA=
+github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM=
+github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
+github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
+github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
+github.com/nedscode/memdb v0.0.0-20190730235322-b1504ff22569 h1:NDp8Kzq1l5cS5CNqeKPn4zRzPf5Fz4qVsXHTJ9C5W3I=
+github.com/nedscode/memdb v0.0.0-20190730235322-b1504ff22569/go.mod h1:fBJ7MTqkxqFO4dyD4rAwXI+VNJm1+AGc+M923BSxMRk=
 github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
 github.com/shirou/gopsutil v2.20.6+incompatible h1:P37G9YH8M4vqkKcwBosp+URN5O8Tay67D2MbR361ioY=
 github.com/shirou/gopsutil v2.20.6+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=

+ 85 - 68
proto/service.pb.go

@@ -28,10 +28,11 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
 
 //服务参数
 type ServiceMeta struct {
-	ServiceName          string   `protobuf:"bytes,1,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
-	ServiceAddr          string   `protobuf:"bytes,2,opt,name=serviceAddr,proto3" json:"serviceAddr,omitempty"`
-	Workers              int32    `protobuf:"varint,3,opt,name=workers,proto3" json:"workers,omitempty"`
-	Balance              int32    `protobuf:"varint,4,opt,name=balance,proto3" json:"balance,omitempty"`
+	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	Ip                   string   `protobuf:"bytes,2,opt,name=ip,proto3" json:"ip,omitempty"`
+	Port                 int32    `protobuf:"varint,3,opt,name=port,proto3" json:"port,omitempty"`
+	Workers              int32    `protobuf:"varint,4,opt,name=workers,proto3" json:"workers,omitempty"`
+	Balance              int32    `protobuf:"varint,5,opt,name=balance,proto3" json:"balance,omitempty"`
 	XXX_NoUnkeyedLiteral struct{} `json:"-"`
 	XXX_unrecognized     []byte   `json:"-"`
 	XXX_sizecache        int32    `json:"-"`
@@ -62,20 +63,27 @@ func (m *ServiceMeta) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_ServiceMeta proto.InternalMessageInfo
 
-func (m *ServiceMeta) GetServiceName() string {
+func (m *ServiceMeta) GetName() string {
 	if m != nil {
-		return m.ServiceName
+		return m.Name
 	}
 	return ""
 }
 
-func (m *ServiceMeta) GetServiceAddr() string {
+func (m *ServiceMeta) GetIp() string {
 	if m != nil {
-		return m.ServiceAddr
+		return m.Ip
 	}
 	return ""
 }
 
+func (m *ServiceMeta) GetPort() int32 {
+	if m != nil {
+		return m.Port
+	}
+	return 0
+}
+
 func (m *ServiceMeta) GetWorkers() int32 {
 	if m != nil {
 		return m.Workers
@@ -91,39 +99,39 @@ func (m *ServiceMeta) GetBalance() int32 {
 }
 
 //标准字符串返回结果
-type StringReq struct {
+type StringRepData struct {
 	Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
 	XXX_NoUnkeyedLiteral struct{} `json:"-"`
 	XXX_unrecognized     []byte   `json:"-"`
 	XXX_sizecache        int32    `json:"-"`
 }
 
-func (m *StringReq) Reset()         { *m = StringReq{} }
-func (m *StringReq) String() string { return proto.CompactTextString(m) }
-func (*StringReq) ProtoMessage()    {}
-func (*StringReq) Descriptor() ([]byte, []int) {
+func (m *StringRepData) Reset()         { *m = StringRepData{} }
+func (m *StringRepData) String() string { return proto.CompactTextString(m) }
+func (*StringRepData) ProtoMessage()    {}
+func (*StringRepData) Descriptor() ([]byte, []int) {
 	return fileDescriptor_a0b84a42fa06f626, []int{1}
 }
 
-func (m *StringReq) XXX_Unmarshal(b []byte) error {
-	return xxx_messageInfo_StringReq.Unmarshal(m, b)
+func (m *StringRepData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_StringRepData.Unmarshal(m, b)
 }
-func (m *StringReq) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
-	return xxx_messageInfo_StringReq.Marshal(b, m, deterministic)
+func (m *StringRepData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_StringRepData.Marshal(b, m, deterministic)
 }
-func (m *StringReq) XXX_Merge(src proto.Message) {
-	xxx_messageInfo_StringReq.Merge(m, src)
+func (m *StringRepData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_StringRepData.Merge(m, src)
 }
-func (m *StringReq) XXX_Size() int {
-	return xxx_messageInfo_StringReq.Size(m)
+func (m *StringRepData) XXX_Size() int {
+	return xxx_messageInfo_StringRepData.Size(m)
 }
-func (m *StringReq) XXX_DiscardUnknown() {
-	xxx_messageInfo_StringReq.DiscardUnknown(m)
+func (m *StringRepData) XXX_DiscardUnknown() {
+	xxx_messageInfo_StringRepData.DiscardUnknown(m)
 }
 
-var xxx_messageInfo_StringReq proto.InternalMessageInfo
+var xxx_messageInfo_StringRepData proto.InternalMessageInfo
 
-func (m *StringReq) GetData() string {
+func (m *StringRepData) GetData() string {
 	if m != nil {
 		return m.Data
 	}
@@ -132,7 +140,8 @@ func (m *StringReq) GetData() string {
 
 //
 type ApplyReqData struct {
-	ServiceName          string   `protobuf:"bytes,1,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
+	Name                 string   `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
+	Balance              int32    `protobuf:"varint,2,opt,name=balance,proto3" json:"balance,omitempty"`
 	XXX_NoUnkeyedLiteral struct{} `json:"-"`
 	XXX_unrecognized     []byte   `json:"-"`
 	XXX_sizecache        int32    `json:"-"`
@@ -163,16 +172,23 @@ func (m *ApplyReqData) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_ApplyReqData proto.InternalMessageInfo
 
-func (m *ApplyReqData) GetServiceName() string {
+func (m *ApplyReqData) GetName() string {
 	if m != nil {
-		return m.ServiceName
+		return m.Name
 	}
 	return ""
 }
 
+func (m *ApplyReqData) GetBalance() int32 {
+	if m != nil {
+		return m.Balance
+	}
+	return 0
+}
+
 type ApplyRepData struct {
-	ServiceAddr          string   `protobuf:"bytes,1,opt,name=serviceAddr,proto3" json:"serviceAddr,omitempty"`
-	ServiceResourceId    string   `protobuf:"bytes,2,opt,name=serviceResourceId,proto3" json:"serviceResourceId,omitempty"`
+	Addr                 string   `protobuf:"bytes,1,opt,name=addr,proto3" json:"addr,omitempty"`
+	ResourceId           string   `protobuf:"bytes,2,opt,name=resourceId,proto3" json:"resourceId,omitempty"`
 	XXX_NoUnkeyedLiteral struct{} `json:"-"`
 	XXX_unrecognized     []byte   `json:"-"`
 	XXX_sizecache        int32    `json:"-"`
@@ -203,23 +219,23 @@ func (m *ApplyRepData) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_ApplyRepData proto.InternalMessageInfo
 
-func (m *ApplyRepData) GetServiceAddr() string {
+func (m *ApplyRepData) GetAddr() string {
 	if m != nil {
-		return m.ServiceAddr
+		return m.Addr
 	}
 	return ""
 }
 
-func (m *ApplyRepData) GetServiceResourceId() string {
+func (m *ApplyRepData) GetResourceId() string {
 	if m != nil {
-		return m.ServiceResourceId
+		return m.ResourceId
 	}
 	return ""
 }
 
 func init() {
 	proto.RegisterType((*ServiceMeta)(nil), "proto.ServiceMeta")
-	proto.RegisterType((*StringReq)(nil), "proto.StringReq")
+	proto.RegisterType((*StringRepData)(nil), "proto.StringRepData")
 	proto.RegisterType((*ApplyReqData)(nil), "proto.ApplyReqData")
 	proto.RegisterType((*ApplyRepData)(nil), "proto.ApplyRepData")
 }
@@ -229,24 +245,25 @@ func init() {
 }
 
 var fileDescriptor_a0b84a42fa06f626 = []byte{
-	// 272 bytes of a gzipped FileDescriptorProto
-	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x50, 0xbb, 0x4e, 0xc3, 0x30,
-	0x14, 0xad, 0xa1, 0x21, 0xea, 0x2d, 0x48, 0x70, 0x59, 0xac, 0x2e, 0x44, 0x9e, 0x3a, 0xa0, 0x0a,
-	0xda, 0x2f, 0xa8, 0xd4, 0x85, 0x01, 0x06, 0x77, 0x47, 0x72, 0x93, 0xab, 0x2a, 0x22, 0xd4, 0xa9,
-	0x6d, 0x40, 0xfd, 0x00, 0xfe, 0x91, 0xcf, 0x41, 0x71, 0x1c, 0x64, 0xda, 0x0e, 0x9d, 0x92, 0x7b,
-	0x1e, 0xf2, 0x39, 0x07, 0xae, 0x2c, 0x99, 0xcf, 0x32, 0xa7, 0x49, 0x6d, 0xb4, 0xd3, 0x98, 0xf8,
-	0x8f, 0xf8, 0x66, 0x30, 0x5c, 0xb6, 0xc4, 0x33, 0x39, 0x85, 0x19, 0x0c, 0x83, 0xee, 0x45, 0xbd,
-	0x13, 0x67, 0x19, 0x1b, 0x0f, 0x64, 0x0c, 0x45, 0x8a, 0x79, 0x51, 0x18, 0x7e, 0xf6, 0x4f, 0xd1,
-	0x40, 0xc8, 0x21, 0xfd, 0xd2, 0xe6, 0x8d, 0x8c, 0xe5, 0xe7, 0x19, 0x1b, 0x27, 0xb2, 0x3b, 0x1b,
-	0x66, 0xa5, 0x2a, 0xb5, 0xc9, 0x89, 0xf7, 0x5b, 0x26, 0x9c, 0xe2, 0x0e, 0x06, 0x4b, 0x67, 0xca,
-	0xcd, 0x5a, 0xd2, 0x16, 0x11, 0xfa, 0x85, 0x72, 0x2a, 0xbc, 0xee, 0xff, 0xc5, 0x03, 0x5c, 0xce,
-	0xeb, 0xba, 0xda, 0x49, 0xda, 0x2e, 0xd4, 0x29, 0x41, 0xc5, 0xeb, 0x9f, 0xa3, 0xde, 0x73, 0xf8,
-	0xe0, 0xec, 0x30, 0xf8, 0x3d, 0xdc, 0x84, 0x53, 0x92, 0xd5, 0x1f, 0x26, 0xa7, 0xa7, 0x22, 0x14,
-	0x3c, 0x24, 0xa6, 0x3f, 0x0c, 0xd2, 0x30, 0x1d, 0x3e, 0x42, 0x2a, 0x69, 0x5d, 0x5a, 0x47, 0x88,
-	0xed, 0xc0, 0x93, 0x68, 0xd5, 0xd1, 0x75, 0x87, 0x75, 0x15, 0x45, 0xaf, 0xb1, 0x2c, 0xc8, 0x3a,
-	0x6d, 0x76, 0x27, 0x5b, 0x66, 0x90, 0xf8, 0x46, 0x78, 0x1b, 0xc8, 0x78, 0x91, 0xd1, 0x1e, 0xe8,
-	0x4b, 0x8b, 0x1e, 0x4e, 0x9b, 0x68, 0x15, 0x29, 0x4b, 0x78, 0x4c, 0x71, 0xec, 0xa1, 0xd5, 0x85,
-	0x87, 0x66, 0xbf, 0x01, 0x00, 0x00, 0xff, 0xff, 0xdb, 0x8e, 0x00, 0x87, 0x34, 0x02, 0x00, 0x00,
+	// 278 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x50, 0x3f, 0x4f, 0xfb, 0x30,
+	0x10, 0xfd, 0x25, 0xbf, 0x86, 0x88, 0x83, 0x32, 0x1c, 0x0c, 0x56, 0x07, 0x54, 0x85, 0xa5, 0x53,
+	0x07, 0x2a, 0x98, 0x58, 0x40, 0x5d, 0x18, 0x58, 0xd2, 0x4f, 0xe0, 0x26, 0xa7, 0x2a, 0x22, 0xc4,
+	0xe6, 0x6c, 0x40, 0xf9, 0xbe, 0x7c, 0x10, 0x64, 0x27, 0xae, 0xcc, 0xbf, 0x81, 0xc9, 0xcf, 0xef,
+	0xde, 0xd3, 0xbb, 0x7b, 0x30, 0x35, 0xc4, 0xaf, 0x4d, 0x45, 0x4b, 0xcd, 0xca, 0x2a, 0xcc, 0xfc,
+	0x53, 0xf4, 0x70, 0xb4, 0x19, 0xf8, 0x07, 0xb2, 0x12, 0x11, 0x26, 0x9d, 0x7c, 0x22, 0x91, 0xcc,
+	0x93, 0xc5, 0x61, 0xe9, 0x31, 0x9e, 0x40, 0xda, 0x68, 0x91, 0x7a, 0x26, 0x6d, 0xb4, 0xd3, 0x68,
+	0xc5, 0x56, 0xfc, 0x9f, 0x27, 0x8b, 0xac, 0xf4, 0x18, 0x05, 0xe4, 0x6f, 0x8a, 0x1f, 0x89, 0x8d,
+	0x98, 0x78, 0x3a, 0x7c, 0xdd, 0x64, 0x2b, 0x5b, 0xd9, 0x55, 0x24, 0xb2, 0x61, 0x32, 0x7e, 0x8b,
+	0x0b, 0x98, 0x6e, 0x2c, 0x37, 0xdd, 0xae, 0x24, 0xbd, 0x96, 0x43, 0x78, 0x2d, 0xad, 0x0c, 0xe1,
+	0x0e, 0x17, 0x37, 0x70, 0x7c, 0xab, 0x75, 0xdb, 0x97, 0xf4, 0x1c, 0x34, 0xdf, 0x16, 0x8c, 0x22,
+	0xd2, 0xcf, 0x11, 0x77, 0x7b, 0xf7, 0x3e, 0x41, 0xd6, 0x35, 0x07, 0xb7, 0xc3, 0x78, 0x0e, 0xc0,
+	0x64, 0xd4, 0x0b, 0x57, 0x74, 0x5f, 0x8f, 0x67, 0x46, 0xcc, 0xe5, 0x7b, 0x02, 0xf9, 0x58, 0x11,
+	0x5e, 0x41, 0x5e, 0xd2, 0xae, 0x31, 0x96, 0x10, 0x87, 0x1e, 0x97, 0x51, 0x7b, 0xb3, 0xb3, 0xc0,
+	0xc5, 0x67, 0x15, 0xff, 0x9c, 0x6d, 0x4d, 0xc6, 0x2a, 0xee, 0xff, 0x64, 0x5b, 0x41, 0xe6, 0xb7,
+	0xc7, 0xd3, 0x51, 0x10, 0x37, 0x31, 0xfb, 0x42, 0x06, 0xd3, 0xb5, 0x5b, 0xb1, 0x25, 0x69, 0x08,
+	0x7f, 0x52, 0xfc, 0x16, 0xb6, 0x3d, 0xf0, 0xf4, 0xea, 0x23, 0x00, 0x00, 0xff, 0xff, 0xaa, 0x1d,
+	0x62, 0xee, 0x27, 0x02, 0x00, 0x00,
 }
 
 // Reference imports to suppress errors if they are not otherwise used.
@@ -262,13 +279,13 @@ const _ = grpc.SupportPackageIsVersion6
 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
 type ServiceClient interface {
 	//注册服务
-	Registe(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringReq, error)
+	Registe(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringRepData, error)
 	//注销服务
-	Destory(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringReq, error)
+	Destory(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringRepData, error)
 	//申请服务
 	Apply(ctx context.Context, in *ApplyReqData, opts ...grpc.CallOption) (*ApplyRepData, error)
 	//释放服务
-	Release(ctx context.Context, in *ApplyRepData, opts ...grpc.CallOption) (*StringReq, error)
+	Release(ctx context.Context, in *ApplyRepData, opts ...grpc.CallOption) (*StringRepData, error)
 }
 
 type serviceClient struct {
@@ -279,8 +296,8 @@ func NewServiceClient(cc grpc.ClientConnInterface) ServiceClient {
 	return &serviceClient{cc}
 }
 
-func (c *serviceClient) Registe(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringReq, error) {
-	out := new(StringReq)
+func (c *serviceClient) Registe(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringRepData, error) {
+	out := new(StringRepData)
 	err := c.cc.Invoke(ctx, "/proto.Service/Registe", in, out, opts...)
 	if err != nil {
 		return nil, err
@@ -288,8 +305,8 @@ func (c *serviceClient) Registe(ctx context.Context, in *ServiceMeta, opts ...gr
 	return out, nil
 }
 
-func (c *serviceClient) Destory(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringReq, error) {
-	out := new(StringReq)
+func (c *serviceClient) Destory(ctx context.Context, in *ServiceMeta, opts ...grpc.CallOption) (*StringRepData, error) {
+	out := new(StringRepData)
 	err := c.cc.Invoke(ctx, "/proto.Service/Destory", in, out, opts...)
 	if err != nil {
 		return nil, err
@@ -306,8 +323,8 @@ func (c *serviceClient) Apply(ctx context.Context, in *ApplyReqData, opts ...grp
 	return out, nil
 }
 
-func (c *serviceClient) Release(ctx context.Context, in *ApplyRepData, opts ...grpc.CallOption) (*StringReq, error) {
-	out := new(StringReq)
+func (c *serviceClient) Release(ctx context.Context, in *ApplyRepData, opts ...grpc.CallOption) (*StringRepData, error) {
+	out := new(StringRepData)
 	err := c.cc.Invoke(ctx, "/proto.Service/Release", in, out, opts...)
 	if err != nil {
 		return nil, err
@@ -318,29 +335,29 @@ func (c *serviceClient) Release(ctx context.Context, in *ApplyRepData, opts ...g
 // ServiceServer is the server API for Service service.
 type ServiceServer interface {
 	//注册服务
-	Registe(context.Context, *ServiceMeta) (*StringReq, error)
+	Registe(context.Context, *ServiceMeta) (*StringRepData, error)
 	//注销服务
-	Destory(context.Context, *ServiceMeta) (*StringReq, error)
+	Destory(context.Context, *ServiceMeta) (*StringRepData, error)
 	//申请服务
 	Apply(context.Context, *ApplyReqData) (*ApplyRepData, error)
 	//释放服务
-	Release(context.Context, *ApplyRepData) (*StringReq, error)
+	Release(context.Context, *ApplyRepData) (*StringRepData, error)
 }
 
 // UnimplementedServiceServer can be embedded to have forward compatible implementations.
 type UnimplementedServiceServer struct {
 }
 
-func (*UnimplementedServiceServer) Registe(ctx context.Context, req *ServiceMeta) (*StringReq, error) {
+func (*UnimplementedServiceServer) Registe(ctx context.Context, req *ServiceMeta) (*StringRepData, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method Registe not implemented")
 }
-func (*UnimplementedServiceServer) Destory(ctx context.Context, req *ServiceMeta) (*StringReq, error) {
+func (*UnimplementedServiceServer) Destory(ctx context.Context, req *ServiceMeta) (*StringRepData, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method Destory not implemented")
 }
 func (*UnimplementedServiceServer) Apply(ctx context.Context, req *ApplyReqData) (*ApplyRepData, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method Apply not implemented")
 }
-func (*UnimplementedServiceServer) Release(ctx context.Context, req *ApplyRepData) (*StringReq, error) {
+func (*UnimplementedServiceServer) Release(ctx context.Context, req *ApplyRepData) (*StringRepData, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method Release not implemented")
 }
 

+ 13 - 11
proto_src/service.proto

@@ -6,30 +6,32 @@ package proto;
 //服务管理
 service Service {
   //注册服务
-  rpc Registe(ServiceMeta)returns(StringReq){}
+  rpc Registe(ServiceMeta)returns(StringRepData){}
   //注销服务
-  rpc Destory(ServiceMeta)returns(StringReq){}
+  rpc Destory(ServiceMeta)returns(StringRepData){}
   //申请服务
   rpc Apply(ApplyReqData)returns(ApplyRepData){}
   //释放服务
-  rpc Release(ApplyRepData)returns(StringReq){}
+  rpc Release(ApplyRepData)returns(StringRepData){}
 }
 //服务参数
 message ServiceMeta{
-  string serviceName = 1;
-  string serviceAddr = 2;
-  int32 workers = 3;
-  int32 balance = 4;
+  string name = 1;
+  string ip = 2;
+  int32 port = 3;
+  int32 workers = 4;
+  int32 balance = 5;
 }
 //标准字符串返回结果
-message StringReq{
+message StringRepData{
   string data = 1;
 }
 //
 message ApplyReqData{
-  string serviceName = 1;
+  string name = 1;
+  int32 balance = 2;
 }
 message ApplyRepData{
-  string serviceAddr = 1;
-  string serviceResourceId = 2;
+  string addr = 1;
+  string resourceId = 2;
 }

+ 31 - 0
server/db.go.bak

@@ -0,0 +1,31 @@
+/**
+服务支持-存储
+*/
+package main
+
+import "github.com/nedscode/memdb"
+
+type ServiceMeta struct {
+	Name        string
+	Addr        string
+	Workers     string
+	BalanceType int32
+	Load        float64
+	Ip          string
+	Port        int32
+	Id          string
+	Used        bool
+}
+
+var mdb *memdb.Store
+
+func InitDb() {
+	// Create the DB schema
+	mdb = memdb.NewStore().
+		PrimaryKey("addr").
+		CreateIndex("ip"). //ip索引
+		CreateIndex("load", "name"). //按负载找利用率最低的
+		CreateIndex("id").
+		CreateIndex("name").
+		CreateIndex("name", "used") //找未使用过的
+}

+ 2 - 16
server/heartbeatService.go

@@ -17,7 +17,7 @@ func (h *Heartbeat) PutStream(cliStr proto.HeartBeat_PutStreamServer) error {
 	for {
 		if tem, err := cliStr.Recv(); err == nil {
 			log.Printf("%s--%s \n", tem.ServiceName, tem.ServiceId)
-			UpdateServiceTtl(tem.ServiceName, tem.ServiceId)
+			UpdateServiceMetaTtl(tem.ServiceId)
 		} else {
 			log.Println("break, err :", err)
 			break
@@ -34,21 +34,7 @@ func ClearTimeoutService(ttl int64) {
 		select {
 		case <-tm.C:
 			//TODO 过滤过期服务
-			now := time.Now().Unix()
-			onlineService.Each(func(k string, v interface{}) {
-				services := v.(SyncMap)
-				removeItems := make([]string, 0, 0)
-				services.Each(func(sk string, sv interface{}) {
-					lastTtl := v.(int64)
-					if now-lastTtl > timeout {
-						removeItems = append(removeItems, sk)
-					}
-				})
-				//清楚失效服务
-				for _, v := range removeItems {
-					services.Remove(v)
-				}
-			})
+			RemoveTimeoutService(timeout)
 		}
 	}
 }

+ 1 - 0
server/main.go

@@ -21,6 +21,7 @@ func init() {
 func main() {
 	//失效服务检查
 	go ClearTimeoutService(*serviceTtl)
+	InitDb()
 	//监听端口
 	lis, err := net.Listen("tcp", *addr)
 	if err != nil {

+ 1 - 0
server/online.go → server/online.go.bak

@@ -69,6 +69,7 @@ func DestoryService(serviceName string, address string) {
 		services := onlineService.Get(serviceName).(SyncMap)
 		if services.Has(address) {
 			services.Remove(address)
+			//TODO 顺带需要清理负载均衡产生的垃圾数据,特别是资源占用表
 		}
 	}
 }

+ 1 - 1
server/serverLoadService.go

@@ -16,7 +16,7 @@ func (s *ServerLoad) PutStream(cliStr proto.ServerLoad_PutStreamServer) error {
 	for {
 		if tem, err := cliStr.Recv(); err == nil {
 			log.Printf("%s--%f \n", tem.Ip, tem.Load)
-			serverLoad.Put(tem.Ip, tem.Load)
+			UpdateServerLoad(tem.Ip, tem.Load)
 		} else {
 			log.Println("break, err :", err)
 			break

+ 0 - 29
server/serviceManager.go

@@ -1,29 +0,0 @@
-package main
-
-import (
-	"app.yhyue.com/BP/servicerd/proto"
-	"context"
-)
-
-/**
-服务治理
-*/
-type Service struct {
-}
-
-func (s *Service) Registe(ctx context.Context, in *proto.ServiceMeta) (*proto.StringReq, error) {
-	AddService(in.ServiceName, in.ServiceAddr, in.Workers, in.Balance)
-	return &proto.StringReq{Data: "ok"}, nil
-}
-
-func (s *Service) Destory(ctx context.Context, meta *proto.ServiceMeta) (*proto.StringReq, error) {
-	panic("implement me")
-}
-
-func (s *Service) Apply(ctx context.Context, data *proto.ApplyReqData) (*proto.ApplyRepData, error) {
-	panic("implement me")
-}
-
-func (s *Service) Release(ctx context.Context, data *proto.ApplyRepData) (*proto.StringReq, error) {
-	panic("implement me")
-}

+ 45 - 0
server/serviceService.go

@@ -0,0 +1,45 @@
+package main
+
+import (
+	"app.yhyue.com/BP/servicerd/proto"
+	"context"
+)
+
+/**
+服务治理
+*/
+type Service struct {
+}
+
+//
+func (s *Service) Registe(ctx context.Context, in *proto.ServiceMeta) (*proto.StringRepData, error) {
+	AddServiceMeta(in.Name, in.Ip, in.Port, in.Workers, in.Balance)
+	return &proto.StringRepData{Data: "ok"}, nil
+}
+
+//
+func (s *Service) Destory(ctx context.Context, meta *proto.ServiceMeta) (*proto.StringRepData, error) {
+	DestoryServiceMeta(meta.Ip, meta.Port)
+	return &proto.StringRepData{Data: "ok"}, nil
+}
+
+//
+func (s *Service) Apply(ctx context.Context, in *proto.ApplyReqData) (*proto.ApplyRepData, error) {
+	var addr, id string
+	var err error
+	if in.Balance == LOAD {
+		addr, id, err = ApplyWithLoad(in.Name)
+	} else {
+		addr, id, err = ApplyWithNotUse(in.Name)
+	}
+	if err != nil {
+		return &proto.ApplyRepData{Addr: addr, ResourceId: id}, nil
+	} else {
+		return &proto.ApplyRepData{Addr: "", ResourceId: ""}, err
+	}
+}
+
+func (s *Service) Release(ctx context.Context, in *proto.ApplyRepData) (*proto.StringRepData, error) {
+	Release(in.ResourceId)
+	return &proto.StringRepData{Data: "ok"}, nil
+}

+ 265 - 0
server/serviceStore.go

@@ -0,0 +1,265 @@
+package main
+
+/**
+服务管理,存储服务
+TODO go-memdb这个内存数据库没有排序????
+*/
+import (
+	"errors"
+	"fmt"
+	"github.com/hashicorp/go-memdb"
+	"log"
+	"time"
+)
+
+const (
+	LOAD = iota //按服务器压力分配
+	SEQ         //顺序执行 ,根据服务支持的执行单元数
+)
+
+type ServiceMeta struct {
+	Name        string
+	Addr        string
+	Workers     int32
+	BalanceType int32
+	Load        float64
+	Ip          string
+	Port        int32
+	Id          string
+	Used        bool
+	LastUseTime int64 //最后一次使用
+	LastTtlTime int64 //心跳时间
+}
+
+//
+func (sm *ServiceMeta) String() string {
+	return fmt.Sprintf("%s-%s-%d-%d-%s", sm.Name, sm.Addr, sm.Workers, sm.BalanceType, sm.Id)
+}
+
+var mdb *memdb.MemDB
+
+func InitDb() {
+	// Create the DB schema
+	schema := &memdb.DBSchema{
+		Tables: map[string]*memdb.TableSchema{
+			"servicemeta": &memdb.TableSchema{
+				Name: "servicemeta",
+				Indexes: map[string]*memdb.IndexSchema{
+					"id": &memdb.IndexSchema{
+						Name:    "id",
+						Unique:  true,
+						Indexer: &memdb.StringFieldIndex{Field: "Id"},
+					},
+					"ip": &memdb.IndexSchema{
+						Name:    "ip",
+						Unique:  false,
+						Indexer: &memdb.StringFieldIndex{Field: "Ip"},
+					},
+					"name": &memdb.IndexSchema{
+						Name:    "name",
+						Unique:  false,
+						Indexer: &memdb.StringFieldIndex{Field: "Name"},
+					},
+					"name_used": &memdb.IndexSchema{ //服务名称+资源使用索引
+						Name:   "name_used",
+						Unique: false,
+						Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
+							&memdb.StringFieldIndex{Field: "Name"},
+							&memdb.BoolFieldIndex{Field: "Used"},
+						}},
+					},
+					"ip_port": &memdb.IndexSchema{
+						Name:   "ip_port",
+						Unique: false,
+						Indexer: &memdb.CompoundIndex{Indexes: []memdb.Indexer{
+							&memdb.StringFieldIndex{Field: "Ip"},
+							&memdb.IntFieldIndex{Field: "Port"},
+						}},
+					},
+				},
+			},
+		},
+	}
+	// Create a new data base
+	var err error
+	mdb, err = memdb.NewMemDB(schema)
+	if err != nil {
+		panic(err)
+	}
+}
+
+//添加服务
+func AddServiceMeta(serviceName string, ip string, port int32, workers int32, balance int32) {
+	now := time.Now().Unix()
+	txn := mdb.Txn(true)
+	defer txn.Commit()
+	for i := 0; i < int(workers); i++ {
+		sm := &ServiceMeta{
+			Name:        serviceName,
+			Ip:          ip,
+			Port:        port,
+			Workers:     workers,
+			BalanceType: balance,
+			LastTtlTime: now,
+			Id:          fmt.Sprintf("%s:%d_%d", ip, port, i+1),
+			Addr:        fmt.Sprintf("%s:%d", ip, port),
+		}
+		err := txn.Insert("servicemeta", sm)
+		if err != nil {
+			log.Println(err.Error())
+		}
+	}
+}
+
+//更新服务有效期
+func UpdateServiceMetaTtl(serviceId string) {
+	now := time.Now().Unix()
+	txn := mdb.Txn(true)
+	defer txn.Commit()
+	item, err := txn.First("servicemeta", "id", serviceId)
+	if err != nil {
+		log.Println(err.Error())
+		return
+	}
+	sm := item.(*ServiceMeta)
+	sm.LastTtlTime = now
+	_ = txn.Insert("servicemeta", sm)
+}
+
+//注销服务
+func DestoryServiceMeta(ip string, port int32) {
+	txn := mdb.Txn(true)
+	defer txn.Commit()
+	_, err := txn.DeleteAll("servicemeta", "ip_port", ip, port)
+	if err != nil {
+		log.Println(err.Error())
+	}
+}
+
+//申请调用,找未使用的那个资源
+func ApplyWithNotUse(serviceName string) (string, string, error) {
+	txn := mdb.Txn(true)
+	defer txn.Changes()
+	item, err := txn.First("servicemeta", "name_used", serviceName, false)
+	if err != nil {
+		log.Println(err.Error())
+		return "", "", errors.New("没有可用服务")
+	}
+	sm := item.(*ServiceMeta)
+	sm.Used = true
+	sm.LastUseTime = time.Now().Unix()
+	_ = txn.Insert("servicemeta", sm)
+	return sm.Addr, sm.Id, nil
+}
+
+//申请调用,找负载最低的那个服务器
+func ApplyWithLoad(serviceName string) (string, string, error) {
+	txn := mdb.Txn(false)
+	defer txn.Abort()
+	rs, err := txn.Get("servicemeta", "name", serviceName)
+	if err != nil {
+		log.Println(err.Error())
+		return "", "", errors.New("没有可用服务")
+	}
+	var addr, id string
+	var load float64 = 0
+	for obj := rs.Next(); obj != nil; obj = rs.Next() {
+		meta := obj.(*ServiceMeta)
+		if meta.Load < load { //找负载最小的那个
+			load, addr, id = meta.Load, meta.Addr, meta.Id
+		}
+	}
+	return addr, id, nil
+}
+
+//释放资源
+func Release(serviceId string) {
+	txn := mdb.Txn(true)
+	defer txn.Changes()
+	item, err := txn.First("servicemeta", "id", serviceId, false)
+	if err != nil {
+		log.Println(err.Error())
+		return
+	}
+	sm := item.(*ServiceMeta)
+	sm.Used = false
+	sm.LastUseTime = time.Now().Unix()
+	_ = txn.Insert("servicemeta", sm)
+}
+
+//更新所有服务负载
+func UpdateServerLoad(ip string, load float64) {
+	txn := mdb.Txn(true)
+	defer txn.Changes()
+	rs, err := txn.Get("servicemeta", "ip", ip)
+	if err != nil {
+		log.Println(err.Error())
+		return
+	}
+	for obj := rs.Next(); obj != nil; obj = rs.Next() {
+		meta := obj.(*ServiceMeta)
+		meta.Load = load
+		_ = txn.Insert("servicemeta", meta)
+	}
+}
+
+//
+func RemoveTimeoutService(timeout int64) {
+	txn := mdb.Txn(true)
+	defer txn.Commit()
+	rs, err := txn.Get("servicemeta", "id", "")
+	if err != nil {
+		log.Println(err.Error())
+		return
+	}
+	filter := memdb.NewFilterIterator(rs, timeoutFilterFactory(timeout))
+	ids := make([]string, 0, 0)
+	for obj := filter.Next(); obj != nil; obj = filter.Next() {
+		meta := obj.(*ServiceMeta)
+		ids = append(ids, meta.Id)
+	}
+	for _, id := range ids {
+		_, _ = txn.DeleteAll("servicemeta", "id", id)
+	}
+}
+
+//过滤器工厂
+func timeoutFilterFactory(timeout int64) func(interface{}) bool {
+	limit := timeout
+	return func(raw interface{}) bool {
+		obj, ok := raw.(*ServiceMeta)
+		if !ok {
+			return false
+		}
+		return time.Now().Unix()-obj.LastTtlTime < limit
+	}
+}
+
+//
+//func main() {
+//	InitDb()
+//	txn := mdb.Txn(true)
+//	for i := 0; i < 20; i++ {
+//		sm := &ServiceMeta{Name: "ocr", Ip: "192.168.20.100", Port: 50050, Workers: 20, BalanceType: LOAD}
+//		sm.Id = fmt.Sprintf("%s:%d_%d", sm.Ip, sm.Port, i+1)
+//		sm.Addr = fmt.Sprintf("%s:%d", sm.Ip, sm.Port)
+//		err := txn.Insert("servicemeta", sm)
+//		if err != nil {
+//			fmt.Println(err.Error())
+//		}
+//	}
+//	txn.Commit()
+//	txn = mdb.Txn(false)
+//	defer txn.Abort()
+//
+//	rs, err := txn.Get("servicemeta", "name", "ocr")
+//	if err != nil {
+//		fmt.Println(err.Error())
+//	}
+//
+//	for obj := rs.Next(); obj != nil; obj = rs.Next() {
+//		meta := obj.(*ServiceMeta)
+//		fmt.Println(meta.Name, meta.Addr, meta.Id)
+//	}
+//
+//}

+ 0 - 0
server/snycmap.go → server/snycmap.go.bak