NewReigster.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package node
  2. import (
  3. "app.yhyue.com/moapp/jybase/iputil"
  4. "fmt"
  5. "github.com/zeromicro/go-zero/core/logx"
  6. "github.com/zeromicro/go-zero/core/threading"
  7. clientv3 "go.etcd.io/etcd/client/v3"
  8. "time"
  9. )
  10. type Publisher struct {
  11. code string //服务code
  12. serAddr string //服务地址
  13. serPort string //服务端口
  14. endpoints []string //etcd集群地址
  15. leaseId clientv3.LeaseID //租约
  16. client *clientv3.Client //链接
  17. stop chan struct{}
  18. }
  19. func NewPublisher(serverCode, port string, endpoints ...string) *Publisher {
  20. if len(endpoints) == 0 {
  21. endpoints = defaultEndpoints
  22. }
  23. return &Publisher{
  24. code: serverCode,
  25. serPort: port,
  26. endpoints: endpoints,
  27. stop: make(chan struct{}),
  28. }
  29. }
  30. // Register 注册服务
  31. func (p *Publisher) Register() (chan struct{}, error) {
  32. //创建链接
  33. client, err := clientv3.New(clientv3.Config{
  34. Endpoints: p.endpoints,
  35. DialTimeout: time.Duration(defaultDialTimeout) * time.Second,
  36. })
  37. if err != nil {
  38. return p.stop, err
  39. }
  40. //注册节点
  41. err = p.register(client)
  42. if err != nil {
  43. return p.stop, err
  44. }
  45. //续租约
  46. return p.stop, p.keepAliveAsync(client)
  47. }
  48. // getRegisterKey 获取注册key
  49. func (p *Publisher) getRegisterKey() (key, val string) {
  50. addr, port := p.serAddr, p.serPort
  51. if addr == "" {
  52. addr = fmt.Sprintf("http://%s", iputil.InternalIp())
  53. }
  54. if p.serPort == "" {
  55. port = "80"
  56. }
  57. finalAddr := fmt.Sprintf("%s:%s", addr, port)
  58. return createServerRegisterKey(defaultScheme, p.code, finalAddr), time.Now().Format("2006-01-02 15:04:05")
  59. }
  60. // register 注册节点
  61. func (p *Publisher) register(cli *clientv3.Client) error {
  62. resp, err := cli.Grant(cli.Ctx(), 5)
  63. if err != nil {
  64. return err
  65. }
  66. p.leaseId = resp.ID
  67. key, val := p.getRegisterKey()
  68. _, err = cli.Put(cli.Ctx(), key, val, clientv3.WithLease(p.leaseId))
  69. return err
  70. }
  71. // keepAliveAsync 续租
  72. func (p *Publisher) keepAliveAsync(cli *clientv3.Client) error {
  73. ch, err := cli.KeepAlive(cli.Ctx(), p.leaseId)
  74. if err != nil {
  75. return err
  76. }
  77. threading.GoSafe(func() {
  78. for {
  79. select {
  80. case <-p.stop:
  81. p.revoke(cli)
  82. fmt.Println("服务主动关闭")
  83. case _, ok := <-ch:
  84. if !ok {
  85. p.revoke(cli)
  86. if _, err := p.Register(); err != nil {
  87. logx.Errorf("KeepAlive: %s", err.Error())
  88. } else {
  89. fmt.Printf("重新创建链接")
  90. }
  91. return
  92. }
  93. }
  94. }
  95. })
  96. return nil
  97. }
  98. // revoke 撤销给定的租约
  99. func (p *Publisher) revoke(cli *clientv3.Client) {
  100. if _, err := cli.Revoke(cli.Ctx(), p.leaseId); err != nil {
  101. logx.Error(err)
  102. }
  103. }