瀏覽代碼

添加服务管理

Tao Zhang 5 年之前
父節點
當前提交
468eb65077
共有 5 個文件被更改,包括 404 次插入0 次删除
  1. 9 0
      main.go
  2. 66 0
      online.go
  3. 244 0
      proto/heartbeat.pb.go
  4. 24 0
      proto_src/heartbeat.proto
  5. 61 0
      snycmap.go

+ 9 - 0
main.go

@@ -0,0 +1,9 @@
+package main
+
+import "fmt"
+
+func main() {
+	sm := NewSyncMap()
+	sm.Put("name", "1234")
+	fmt.Println(sm.Get("name"))
+}

+ 66 - 0
online.go

@@ -0,0 +1,66 @@
+/**
+心跳检测,服务端实现,用于记录心跳
+*/
+package main
+
+import (
+	"container/list"
+	"fmt"
+	"time"
+)
+
+const (
+	BALANCE_RANDOM = iota //随机
+	BALANCE_LOAD          //按服务器压力分配
+	BALANCE_seq           //顺序执行 ,根据服务支持的执行单元数
+
+)
+
+//在线服务
+var (
+	onlineService = NewSyncMap() //找到相应的服务节点
+	serverLoad    = NewSyncMap() //服务节点的负载情况,主要是cpu内存占用
+	servicesLoad  = NewSyncMap() //服务使用情况,这里是一张登记表
+	balanceTable  = NewSyncMap() //指定服务的负载均衡策略
+)
+
+//追加服务
+func AddService(serviceName string, ip string, port int, workers int, balance int) {
+	now := time.Now().Unix()
+	address := fmt.Sprint("%s%d", ip, port)
+	if !balanceTable.Has(serviceName) {
+		balanceTable.Put(serviceName, balance)
+	}
+	if onlineService.Has(serviceName) {
+		services := NewSyncMap()
+		services.Put(address, now)
+		onlineService.Put(serviceName, services)
+	} else {
+		services := onlineService.Get(serviceName).(SyncMap)
+		services.Put(address, now)
+	}
+	//TODO 追加多个服务执行单元
+	var serviceList *list.List
+	if servicesLoad.Has(serviceName) {
+		serviceList = servicesLoad.Get(serviceName).(*list.List)
+	} else {
+		serviceList = list.New()
+		servicesLoad.Put(serviceName, serviceList)
+	}
+	for i := 0; i < workers; i++ {
+		serviceId := fmt.Sprintf("%s_%d", address, i)
+		serviceList.PushBack(serviceId)
+	}
+}
+
+//注销服务
+func DestoryService(serviceName string, address string) {
+	if onlineService.Has(serviceName) {
+		services := onlineService.Get(serviceName).(SyncMap)
+		if services.Has(address) {
+			services.Remove(address)
+		}
+	}
+}
+
+//发现服务

+ 244 - 0
proto/heartbeat.pb.go

@@ -0,0 +1,244 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: heartbeat.proto
+
+//声明 包名
+
+package proto
+
+import (
+	context "context"
+	fmt "fmt"
+	proto "github.com/golang/protobuf/proto"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+	math "math"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
+
+//stream请求结构
+type StreamReqData struct {
+	Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *StreamReqData) Reset()         { *m = StreamReqData{} }
+func (m *StreamReqData) String() string { return proto.CompactTextString(m) }
+func (*StreamReqData) ProtoMessage()    {}
+func (*StreamReqData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_3c667767fb9826a9, []int{0}
+}
+
+func (m *StreamReqData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_StreamReqData.Unmarshal(m, b)
+}
+func (m *StreamReqData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_StreamReqData.Marshal(b, m, deterministic)
+}
+func (m *StreamReqData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_StreamReqData.Merge(m, src)
+}
+func (m *StreamReqData) XXX_Size() int {
+	return xxx_messageInfo_StreamReqData.Size(m)
+}
+func (m *StreamReqData) XXX_DiscardUnknown() {
+	xxx_messageInfo_StreamReqData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StreamReqData proto.InternalMessageInfo
+
+func (m *StreamReqData) GetData() string {
+	if m != nil {
+		return m.Data
+	}
+	return ""
+}
+
+//stream返回结构
+type StreamResData struct {
+	Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+	XXX_NoUnkeyedLiteral struct{} `json:"-"`
+	XXX_unrecognized     []byte   `json:"-"`
+	XXX_sizecache        int32    `json:"-"`
+}
+
+func (m *StreamResData) Reset()         { *m = StreamResData{} }
+func (m *StreamResData) String() string { return proto.CompactTextString(m) }
+func (*StreamResData) ProtoMessage()    {}
+func (*StreamResData) Descriptor() ([]byte, []int) {
+	return fileDescriptor_3c667767fb9826a9, []int{1}
+}
+
+func (m *StreamResData) XXX_Unmarshal(b []byte) error {
+	return xxx_messageInfo_StreamResData.Unmarshal(m, b)
+}
+func (m *StreamResData) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+	return xxx_messageInfo_StreamResData.Marshal(b, m, deterministic)
+}
+func (m *StreamResData) XXX_Merge(src proto.Message) {
+	xxx_messageInfo_StreamResData.Merge(m, src)
+}
+func (m *StreamResData) XXX_Size() int {
+	return xxx_messageInfo_StreamResData.Size(m)
+}
+func (m *StreamResData) XXX_DiscardUnknown() {
+	xxx_messageInfo_StreamResData.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_StreamResData proto.InternalMessageInfo
+
+func (m *StreamResData) GetData() string {
+	if m != nil {
+		return m.Data
+	}
+	return ""
+}
+
+func init() {
+	proto.RegisterType((*StreamReqData)(nil), "proto.StreamReqData")
+	proto.RegisterType((*StreamResData)(nil), "proto.StreamResData")
+}
+
+func init() {
+	proto.RegisterFile("heartbeat.proto", fileDescriptor_3c667767fb9826a9)
+}
+
+var fileDescriptor_3c667767fb9826a9 = []byte{
+	// 125 bytes of a gzipped FileDescriptorProto
+	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0x4d, 0x2c,
+	0x2a, 0x49, 0x4a, 0x4d, 0x2c, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a,
+	0xca, 0x5c, 0xbc, 0xc1, 0x25, 0x45, 0xa9, 0x89, 0xb9, 0x41, 0xa9, 0x85, 0x2e, 0x89, 0x25, 0x89,
+	0x42, 0x42, 0x5c, 0x2c, 0x29, 0x89, 0x25, 0x89, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60,
+	0x36, 0xb2, 0xa2, 0x62, 0x5c, 0x8a, 0x8c, 0x3c, 0xb8, 0x38, 0x3d, 0x40, 0x76, 0x38, 0xa5, 0x26,
+	0x96, 0x08, 0x59, 0x73, 0x71, 0x06, 0x94, 0x96, 0x40, 0x34, 0x09, 0x89, 0x40, 0xac, 0xd4, 0x43,
+	0xb1, 0x48, 0x0a, 0x5d, 0x14, 0x6c, 0xb2, 0x12, 0x83, 0x06, 0x63, 0x12, 0x1b, 0x58, 0xc2, 0x18,
+	0x10, 0x00, 0x00, 0xff, 0xff, 0x69, 0xee, 0x44, 0x65, 0xb4, 0x00, 0x00, 0x00,
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConnInterface
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// HeartBeatClient is the client API for HeartBeat service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type HeartBeatClient interface {
+	//rpc GetStream (StreamReqData) returns (stream StreamResData){}
+	PutStream(ctx context.Context, opts ...grpc.CallOption) (HeartBeat_PutStreamClient, error)
+}
+
+type heartBeatClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewHeartBeatClient(cc grpc.ClientConnInterface) HeartBeatClient {
+	return &heartBeatClient{cc}
+}
+
+func (c *heartBeatClient) PutStream(ctx context.Context, opts ...grpc.CallOption) (HeartBeat_PutStreamClient, error) {
+	stream, err := c.cc.NewStream(ctx, &_HeartBeat_serviceDesc.Streams[0], "/proto.HeartBeat/PutStream", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &heartBeatPutStreamClient{stream}
+	return x, nil
+}
+
+type HeartBeat_PutStreamClient interface {
+	Send(*StreamReqData) error
+	CloseAndRecv() (*StreamResData, error)
+	grpc.ClientStream
+}
+
+type heartBeatPutStreamClient struct {
+	grpc.ClientStream
+}
+
+func (x *heartBeatPutStreamClient) Send(m *StreamReqData) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *heartBeatPutStreamClient) CloseAndRecv() (*StreamResData, error) {
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	m := new(StreamResData)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// HeartBeatServer is the server API for HeartBeat service.
+type HeartBeatServer interface {
+	//rpc GetStream (StreamReqData) returns (stream StreamResData){}
+	PutStream(HeartBeat_PutStreamServer) error
+}
+
+// UnimplementedHeartBeatServer can be embedded to have forward compatible implementations.
+type UnimplementedHeartBeatServer struct {
+}
+
+func (*UnimplementedHeartBeatServer) PutStream(srv HeartBeat_PutStreamServer) error {
+	return status.Errorf(codes.Unimplemented, "method PutStream not implemented")
+}
+
+func RegisterHeartBeatServer(s *grpc.Server, srv HeartBeatServer) {
+	s.RegisterService(&_HeartBeat_serviceDesc, srv)
+}
+
+func _HeartBeat_PutStream_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(HeartBeatServer).PutStream(&heartBeatPutStreamServer{stream})
+}
+
+type HeartBeat_PutStreamServer interface {
+	SendAndClose(*StreamResData) error
+	Recv() (*StreamReqData, error)
+	grpc.ServerStream
+}
+
+type heartBeatPutStreamServer struct {
+	grpc.ServerStream
+}
+
+func (x *heartBeatPutStreamServer) SendAndClose(m *StreamResData) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *heartBeatPutStreamServer) Recv() (*StreamReqData, error) {
+	m := new(StreamReqData)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+var _HeartBeat_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "proto.HeartBeat",
+	HandlerType: (*HeartBeatServer)(nil),
+	Methods:     []grpc.MethodDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "PutStream",
+			Handler:       _HeartBeat_PutStream_Handler,
+			ClientStreams: true,
+		},
+	},
+	Metadata: "heartbeat.proto",
+}

+ 24 - 0
proto_src/heartbeat.proto

@@ -0,0 +1,24 @@
+syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc
+
+//声明 包名
+package proto;
+
+//心跳检测
+service HeartBeat {
+  /*
+  以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
+  */
+  //rpc GetStream (StreamReqData) returns (stream StreamResData){}
+  rpc PutStream (stream StreamReqData) returns (StreamResData){}
+  //rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
+}
+
+
+//stream请求结构
+message StreamReqData {
+  string data = 1;
+}
+//stream返回结构
+message StreamResData {
+  string data = 1;
+}

+ 61 - 0
snycmap.go

@@ -0,0 +1,61 @@
+/**
+支持同步的,带锁的Map
+*/
+package main
+
+import "sync"
+
+type SyncMap struct {
+	dict map[string]interface{}
+	l    *sync.RWMutex
+}
+
+//创建新的同步map
+func NewSyncMap() *SyncMap {
+	return &SyncMap{
+		dict: map[string]interface{}{},
+		l:    new(sync.RWMutex),
+	}
+}
+
+//
+func (sm *SyncMap) Put(key string, value interface{}) {
+	sm.l.Lock()
+	defer sm.l.Unlock()
+	sm.dict[key] = value
+
+}
+
+//
+func (sm *SyncMap) Get(key string) interface{} {
+	sm.l.RLock()
+	defer sm.l.RUnlock()
+	return sm.dict[key]
+}
+
+//
+func (sm *SyncMap) Has(key string) bool {
+	sm.l.RLock()
+	defer sm.l.RUnlock()
+	if _, ok := sm.dict[key]; ok {
+		return true
+	} else {
+		return false
+	}
+}
+
+//
+func (sm *SyncMap) Remove(key string) {
+	sm.l.Lock()
+	defer sm.l.Unlock()
+	delete(sm.dict, key)
+}
+
+//遍历
+func (sm *SyncMap) Each(fn func(key string, value interface{})) {
+	sm.l.RLock()
+	defer sm.l.RUnlock()
+	for k, v := range sm.dict {
+		fn(k, v)
+	}
+}