浏览代码

添加心跳服务

Tao Zhang 5 年之前
父节点
当前提交
9657b5e5b7
共有 5 个文件被更改,包括 97 次插入18 次删除
  1. 54 0
      heartbeatService.go
  2. 9 1
      main.go
  3. 11 6
      online.go
  4. 20 10
      proto/heartbeat.pb.go
  5. 3 1
      proto_src/heartbeat.proto

+ 54 - 0
heartbeatService.go

@@ -0,0 +1,54 @@
+package main
+
+import (
+	"app.yhyue.com/BP/servicerd/proto"
+	"log"
+	"time"
+)
+
+/**
+心跳服务
+*/
+type Heartbeat struct {
+}
+
+//
+func (h *Heartbeat) PutStream(cliStr proto.HeartBeat_PutStreamServer) error {
+	for {
+		if tem, err := cliStr.Recv(); err == nil {
+			log.Printf("%s--%s \n", tem.ServiceName, tem.ServiceId)
+			UpdateServiceTtl(tem.ServiceName, tem.ServiceId)
+		} else {
+			log.Println("break, err :", err)
+			break
+		}
+	}
+	return nil
+}
+
+//过期服务清理
+func ClearTimeoutService(ttl int64) {
+	timeout := ttl * 2
+	tm := time.NewTicker(time.Duration(ttl) * time.Second)
+	for {
+		select {
+		case <-tm.C:
+			//TODO 过滤过期服务
+			now := time.Now().Unix()
+			onlineService.Each(func(k string, v interface{}) {
+				services := v.(SyncMap)
+				removeItems := make([]string, 0, 0)
+				services.Each(func(sk string, sv interface{}) {
+					lastTtl := v.(int64)
+					if now-lastTtl > timeout {
+						removeItems = append(removeItems, sk)
+					}
+				})
+				//清楚失效服务
+				for _, v := range removeItems {
+					services.Remove(v)
+				}
+			})
+		}
+	}
+}

+ 9 - 1
main.go

@@ -9,7 +9,8 @@ import (
 )
 
 var (
-	addr = flag.String("addr", ":10021", "监听地址")
+	addr       = flag.String("addr", ":10021", "监听地址")
+	serviceTtl = flag.Int64("ttl", 30, "服务失效时间(秒)")
 )
 
 func init() {
@@ -18,6 +19,8 @@ func init() {
 
 //
 func main() {
+	//失效服务检查
+	go ClearTimeoutService(*serviceTtl)
 	//监听端口
 	lis, err := net.Listen("tcp", *addr)
 	if err != nil {
@@ -27,7 +30,12 @@ func main() {
 	//创建一个grpc 服务器
 	s := grpc.NewServer()
 	//注册事件
+	//1.服务器负载
 	proto.RegisterServerLoadServer(s, &ServerLoad{})
+	//2.服务心跳
+	proto.RegisterHeartBeatServer(s, &Heartbeat{})
+	//3.服务治理
+
 	//处理链接
 	_ = s.Serve(lis)
 }

+ 11 - 6
online.go

@@ -53,6 +53,16 @@ func AddService(serviceName string, ip string, port int, workers int, balance in
 	}
 }
 
+//更新服务有效期
+func UpdateServiceTtl(serviceName string, serviceId string) {
+	if onlineService.Has(serviceName) {
+		services := onlineService.Get(serviceName).(SyncMap)
+		if services.Has(serviceId) {
+			services.Put(serviceId, time.Now().Unix())
+		}
+	}
+}
+
 //注销服务
 func DestoryService(serviceName string, address string) {
 	if onlineService.Has(serviceName) {
@@ -77,9 +87,4 @@ func Discover(serviceName string) (string, string) {
 	case BALANCE_SEQ:
 	}
 	return "", ""
-}
-
-//更新服务器负载
-func UpdateServerLoad(serverIp string, load float64) {
-	serverLoad.Put(serverIp, load)
-}
+}

+ 20 - 10
proto/heartbeat.pb.go

@@ -28,7 +28,8 @@ const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
 
 //stream请求结构
 type StreamReqData struct {
-	Data                 string   `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
+	ServiceName          string   `protobuf:"bytes,1,opt,name=serviceName,proto3" json:"serviceName,omitempty"`
+	ServiceId            string   `protobuf:"bytes,2,opt,name=serviceId,proto3" json:"serviceId,omitempty"`
 	XXX_NoUnkeyedLiteral struct{} `json:"-"`
 	XXX_unrecognized     []byte   `json:"-"`
 	XXX_sizecache        int32    `json:"-"`
@@ -59,9 +60,16 @@ func (m *StreamReqData) XXX_DiscardUnknown() {
 
 var xxx_messageInfo_StreamReqData proto.InternalMessageInfo
 
-func (m *StreamReqData) GetData() string {
+func (m *StreamReqData) GetServiceName() string {
 	if m != nil {
-		return m.Data
+		return m.ServiceName
+	}
+	return ""
+}
+
+func (m *StreamReqData) GetServiceId() string {
+	if m != nil {
+		return m.ServiceId
 	}
 	return ""
 }
@@ -116,15 +124,17 @@ func init() {
 }
 
 var fileDescriptor_3c667767fb9826a9 = []byte{
-	// 125 bytes of a gzipped FileDescriptorProto
+	// 159 bytes of a gzipped FileDescriptorProto
 	0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xcf, 0x48, 0x4d, 0x2c,
 	0x2a, 0x49, 0x4a, 0x4d, 0x2c, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x05, 0x53, 0x4a,
-	0xca, 0x5c, 0xbc, 0xc1, 0x25, 0x45, 0xa9, 0x89, 0xb9, 0x41, 0xa9, 0x85, 0x2e, 0x89, 0x25, 0x89,
-	0x42, 0x42, 0x5c, 0x2c, 0x29, 0x89, 0x25, 0x89, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x60,
-	0x36, 0xb2, 0xa2, 0x62, 0x5c, 0x8a, 0x8c, 0x3c, 0xb8, 0x38, 0x3d, 0x40, 0x76, 0x38, 0xa5, 0x26,
-	0x96, 0x08, 0x59, 0x73, 0x71, 0x06, 0x94, 0x96, 0x40, 0x34, 0x09, 0x89, 0x40, 0xac, 0xd4, 0x43,
-	0xb1, 0x48, 0x0a, 0x5d, 0x14, 0x6c, 0xb2, 0x12, 0x83, 0x06, 0x63, 0x12, 0x1b, 0x58, 0xc2, 0x18,
-	0x10, 0x00, 0x00, 0xff, 0xff, 0x69, 0xee, 0x44, 0x65, 0xb4, 0x00, 0x00, 0x00,
+	0xfe, 0x5c, 0xbc, 0xc1, 0x25, 0x45, 0xa9, 0x89, 0xb9, 0x41, 0xa9, 0x85, 0x2e, 0x89, 0x25, 0x89,
+	0x42, 0x0a, 0x5c, 0xdc, 0xc5, 0xa9, 0x45, 0x65, 0x99, 0xc9, 0xa9, 0x7e, 0x89, 0xb9, 0xa9, 0x12,
+	0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0xc8, 0x42, 0x42, 0x32, 0x5c, 0x9c, 0x50, 0xae, 0x67, 0x8a,
+	0x04, 0x13, 0x58, 0x1e, 0x21, 0xa0, 0xa4, 0x8c, 0x30, 0xb0, 0x18, 0x6c, 0xa0, 0x10, 0x17, 0x4b,
+	0x4a, 0x62, 0x49, 0x22, 0xd4, 0x24, 0x30, 0xdb, 0xc8, 0x83, 0x8b, 0xd3, 0x03, 0xe4, 0x1e, 0xa7,
+	0xd4, 0xc4, 0x12, 0x21, 0x6b, 0x2e, 0xce, 0x80, 0xd2, 0x12, 0x88, 0x26, 0x21, 0x11, 0x88, 0xf3,
+	0xf4, 0x50, 0x1c, 0x25, 0x85, 0x2e, 0x0a, 0x36, 0x59, 0x89, 0x41, 0x83, 0x31, 0x89, 0x0d, 0x2c,
+	0x61, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0xeb, 0x84, 0xe3, 0x9e, 0xe0, 0x00, 0x00, 0x00,
 }
 
 // Reference imports to suppress errors if they are not otherwise used.

+ 3 - 1
proto_src/heartbeat.proto

@@ -16,7 +16,9 @@ service HeartBeat {
 
 //stream请求结构
 message StreamReqData {
-  string data = 1;
+  string serviceName = 1;//服务名称
+  string serviceId = 2;//ip+端口
+
 }
 //stream返回结构
 message StreamResData {