123456 3 سال پیش
والد
کامیت
145a4f2ab4
3فایلهای تغییر یافته به همراه132 افزوده شده و 5 حذف شده
  1. 0 3
      README.md
  2. 113 0
      core/node/NewReigster.go
  3. 19 2
      core/node/register.go

+ 0 - 3
README.md

@@ -1,7 +1,4 @@
 # BaseService/gateway
-v1.2
-站内社交
-<
 `BaseService/gateway` 剑鱼网关服务。为剑鱼网站商品标准化,提供统一权限校验,资源扣减。
 
 ## 网关配置

+ 113 - 0
core/node/NewReigster.go

@@ -0,0 +1,113 @@
+package node
+
+import (
+	"app.yhyue.com/moapp/jybase/iputil"
+	"fmt"
+	"github.com/zeromicro/go-zero/core/logx"
+	"github.com/zeromicro/go-zero/core/threading"
+	clientv3 "go.etcd.io/etcd/client/v3"
+	"time"
+)
+
+type Publisher struct {
+	code      string           //服务code
+	serAddr   string           //服务地址
+	serPort   string           //服务端口
+	endpoints []string         //etcd集群地址
+	leaseId   clientv3.LeaseID //租约
+	client    *clientv3.Client //链接
+	stop      chan struct{}
+}
+
+func NewPublisher(serverCode, port string, endpoints ...string) *Publisher {
+	if len(endpoints) == 0 {
+		endpoints = defaultEndpoints
+	}
+	return &Publisher{
+		code:      serverCode,
+		serPort:   port,
+		endpoints: endpoints,
+		stop:      make(chan struct{}),
+	}
+}
+
+// Register 注册服务
+func (p *Publisher) Register() (chan struct{}, error) {
+	//创建链接
+	client, err := clientv3.New(clientv3.Config{
+		Endpoints:   p.endpoints,
+		DialTimeout: time.Duration(defaultDialTimeout) * time.Second,
+	})
+	if err != nil {
+		return p.stop, err
+	}
+
+	//注册节点
+	err = p.register(client)
+	if err != nil {
+		return p.stop, err
+	}
+
+	//续租约
+	return p.stop, p.keepAliveAsync(client)
+}
+
+// getRegisterKey 获取注册key
+func (p *Publisher) getRegisterKey() (key, val string) {
+	addr, port := p.serAddr, p.serPort
+	if addr == "" {
+		addr = fmt.Sprintf("http://%s", iputil.InternalIp())
+	}
+	if p.serPort == "" {
+		port = "80"
+	}
+	finalAddr := fmt.Sprintf("%s:%s", addr, port)
+	return createServerRegisterKey(defaultScheme, p.code, finalAddr), time.Now().Format("2006-01-02 15:04:05")
+}
+
+// register 注册节点
+func (p *Publisher) register(cli *clientv3.Client) error {
+	resp, err := cli.Grant(cli.Ctx(), 5)
+	if err != nil {
+		return err
+	}
+	p.leaseId = resp.ID
+	key, val := p.getRegisterKey()
+	_, err = cli.Put(cli.Ctx(), key, val, clientv3.WithLease(p.leaseId))
+	return err
+}
+
+// keepAliveAsync 续租
+func (p *Publisher) keepAliveAsync(cli *clientv3.Client) error {
+	ch, err := cli.KeepAlive(cli.Ctx(), p.leaseId)
+	if err != nil {
+		return err
+	}
+	threading.GoSafe(func() {
+		for {
+			select {
+			case <-p.stop:
+				p.revoke(cli)
+				fmt.Println("服务主动关闭")
+			case _, ok := <-ch:
+				if !ok {
+					p.revoke(cli)
+					if _, err := p.Register(); err != nil {
+						logx.Errorf("KeepAlive: %s", err.Error())
+					} else {
+						fmt.Printf("重新创建链接")
+					}
+					return
+				}
+			}
+		}
+	})
+	return nil
+}
+
+// revoke 撤销给定的租约
+func (p *Publisher) revoke(cli *clientv3.Client) {
+	if _, err := cli.Revoke(cli.Ctx(), p.leaseId); err != nil {
+		logx.Error(err)
+	}
+}

+ 19 - 2
core/node/register.go

@@ -11,7 +11,14 @@ import (
 
 // Register 注册服务
 func (n *Node) Register(serverCode string, serverPort string, serverIp ...string) (func(), error) {
-	return n.RegisterWithContext(context.Background(), serverCode, serverPort, serverIp...)
+	stop, err := NewPublisher(serverCode, serverPort, n.Client.Endpoints()...).Register()
+	if err != nil {
+		return nil, err
+	}
+	return func() {
+		stop <- struct{}{}
+	}, nil
+	//return n.RegisterWithContext(context.Background(), serverCode, serverPort, serverIp...)
 }
 
 // RegisterWithContext 注册服务
@@ -47,7 +54,17 @@ func (n *Node) RegisterWithContext(ctx context.Context, serverCode string, serve
 	}
 	go func() {
 		for {
-			<-keepAlive
+			select {
+			case <-n.Client.Ctx().Done():
+				fmt.Println("server closed")
+				return
+			case _, ok := <-keepAlive:
+				if !ok {
+					fmt.Println("keep live channel closed")
+					_, _ = n.Client.Revoke(ctx, lease.ID)
+					return
+				}
+			}
 		}
 	}()
 	return func() {