123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- package node
- import (
- "app.yhyue.com/moapp/jybase/iputil"
- "fmt"
- "github.com/zeromicro/go-zero/core/logx"
- "github.com/zeromicro/go-zero/core/threading"
- clientv3 "go.etcd.io/etcd/client/v3"
- "time"
- )
- type Publisher struct {
- code string //服务code
- serAddr string //服务地址
- serPort string //服务端口
- endpoints []string //etcd集群地址
- leaseId clientv3.LeaseID //租约
- client *clientv3.Client //链接
- stop chan struct{}
- }
- func NewPublisher(serverCode, port string, endpoints ...string) *Publisher {
- if len(endpoints) == 0 {
- endpoints = defaultEndpoints
- }
- return &Publisher{
- code: serverCode,
- serPort: port,
- endpoints: endpoints,
- stop: make(chan struct{}),
- }
- }
- // Register 注册服务
- func (p *Publisher) Register() (chan struct{}, error) {
- //创建链接
- client, err := clientv3.New(clientv3.Config{
- Endpoints: p.endpoints,
- DialTimeout: time.Duration(defaultDialTimeout) * time.Second,
- })
- if err != nil {
- return p.stop, err
- }
- //注册节点
- err = p.register(client)
- if err != nil {
- return p.stop, err
- }
- //续租约
- return p.stop, p.keepAliveAsync(client)
- }
- // getRegisterKey 获取注册key
- func (p *Publisher) getRegisterKey() (key, val string) {
- addr, port := p.serAddr, p.serPort
- if addr == "" {
- addr = fmt.Sprintf("http://%s", iputil.InternalIp())
- }
- if p.serPort != "" {
- port = "80"
- }
- finalAddr := fmt.Sprintf("%s:%s", addr, port)
- return createServerRegisterKey(defaultScheme, p.code, finalAddr), time.Now().Format("2006-01-02 15:04:05")
- }
- // register 注册节点
- func (p *Publisher) register(cli *clientv3.Client) error {
- resp, err := cli.Grant(cli.Ctx(), 5)
- if err != nil {
- return err
- }
- p.leaseId = resp.ID
- key, val := p.getRegisterKey()
- _, err = cli.Put(cli.Ctx(), key, val, clientv3.WithLease(p.leaseId))
- return err
- }
- // keepAliveAsync 续租
- func (p *Publisher) keepAliveAsync(cli *clientv3.Client) error {
- ch, err := cli.KeepAlive(cli.Ctx(), p.leaseId)
- if err != nil {
- return err
- }
- threading.GoSafe(func() {
- for {
- select {
- case <-p.stop:
- p.revoke(cli)
- fmt.Println("服务主动关闭")
- case _, ok := <-ch:
- if !ok {
- p.revoke(cli)
- if _, err := p.Register(); err != nil {
- logx.Errorf("KeepAlive: %s", err.Error())
- } else {
- fmt.Printf("重新创建链接")
- }
- return
- }
- }
- }
- })
- return nil
- }
- // revoke 撤销给定的租约
- func (p *Publisher) revoke(cli *clientv3.Client) {
- if _, err := cli.Revoke(cli.Ctx(), p.leaseId); err != nil {
- logx.Error(err)
- }
- }
|