package node import ( "app.yhyue.com/moapp/jybase/iputil" "fmt" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/threading" clientv3 "go.etcd.io/etcd/client/v3" "time" ) type Publisher struct { code string //服务code serAddr string //服务地址 serPort string //服务端口 endpoints []string //etcd集群地址 leaseId clientv3.LeaseID //租约 client *clientv3.Client //链接 stop chan struct{} } func NewPublisher(serverCode, port string, endpoints ...string) *Publisher { if len(endpoints) == 0 { endpoints = defaultEndpoints } return &Publisher{ code: serverCode, serPort: port, endpoints: endpoints, stop: make(chan struct{}), } } // Register 注册服务 func (p *Publisher) Register() (chan struct{}, error) { //创建链接 client, err := clientv3.New(clientv3.Config{ Endpoints: p.endpoints, DialTimeout: time.Duration(defaultDialTimeout) * time.Second, }) if err != nil { return p.stop, err } //注册节点 err = p.register(client) if err != nil { return p.stop, err } //续租约 return p.stop, p.keepAliveAsync(client) } // getRegisterKey 获取注册key func (p *Publisher) getRegisterKey() (key, val string) { addr, port := p.serAddr, p.serPort if addr == "" { addr = fmt.Sprintf("http://%s", iputil.InternalIp()) } if p.serPort != "" { port = "80" } finalAddr := fmt.Sprintf("%s:%s", addr, port) return createServerRegisterKey(defaultScheme, p.code, finalAddr), time.Now().Format("2006-01-02 15:04:05") } // register 注册节点 func (p *Publisher) register(cli *clientv3.Client) error { resp, err := cli.Grant(cli.Ctx(), 5) if err != nil { return err } p.leaseId = resp.ID key, val := p.getRegisterKey() _, err = cli.Put(cli.Ctx(), key, val, clientv3.WithLease(p.leaseId)) return err } // keepAliveAsync 续租 func (p *Publisher) keepAliveAsync(cli *clientv3.Client) error { ch, err := cli.KeepAlive(cli.Ctx(), p.leaseId) if err != nil { return err } threading.GoSafe(func() { for { select { case <-p.stop: p.revoke(cli) fmt.Println("服务主动关闭") case _, ok := <-ch: if !ok { p.revoke(cli) if _, err := p.Register(); err != nil { logx.Errorf("KeepAlive: %s", err.Error()) } else { fmt.Printf("重新创建链接") } return } } } }) return nil } // revoke 撤销给定的租约 func (p *Publisher) revoke(cli *clientv3.Client) { if _, err := cli.Revoke(cli.Ctx(), p.leaseId); err != nil { logx.Error(err) } }