etcd.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. package etcd
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "math/rand"
  7. "os"
  8. "os/signal"
  9. "syscall"
  10. "time"
  11. "app.yhyue.com/moapp/jybase/common"
  12. "app.yhyue.com/moapp/jybase/iputil"
  13. clientv3 "go.etcd.io/etcd/client/v3"
  14. )
  15. type etcd struct {
  16. endpoints []string
  17. }
  18. func NewEtcd(endpoints ...string) *etcd {
  19. return &etcd{endpoints: endpoints}
  20. }
  21. /*
  22. * 通过自定义key、value注册服务,程序停止后,自动删除key
  23. * @param endpoints etcd集群地址
  24. * @param key 服务的key
  25. * @param value
  26. */
  27. func (e *etcd) Register(kvs ...string) error {
  28. return e.register(kvs...)
  29. }
  30. /*
  31. * 自动注册ip和端口到etcd中,程序停止后,自动删除key
  32. * @param endpoints etcd集群地址
  33. * @param key 服务的key
  34. * @param port 服务的端口
  35. */
  36. func (e *etcd) RegisterIpPort(key, port string) error {
  37. return e.register(key, fmt.Sprintf("%s:%s", iputil.InternalIp(), port))
  38. }
  39. func (e *etcd) register(kvs ...string) error {
  40. stops := []chan struct{}{}
  41. var err error
  42. for i := 0; i < len(kvs); i += 2 {
  43. stop, e := NewPublisher(e.endpoints, kvs[i], kvs[i+1]).Register()
  44. if err != nil {
  45. err = e
  46. break
  47. }
  48. stops = append(stops, stop)
  49. }
  50. quit := make(chan os.Signal, 1)
  51. signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
  52. <-quit
  53. for _, stop := range stops {
  54. stop <- struct{}{}
  55. }
  56. return nil
  57. }
  58. func (e *etcd) GetOne(key string) string {
  59. list, err := e.Get(key)
  60. if err != nil || list == nil || len(list) == 0 {
  61. return ""
  62. }
  63. rander := rand.New(rand.NewSource(time.Now().UnixNano()))
  64. return list[rander.Intn(len(list))]
  65. }
  66. func (e *etcd) Get(key string) ([]string, error) {
  67. client, err := getClient(e.endpoints)
  68. if err != nil {
  69. return nil, err
  70. }
  71. defer client.Close()
  72. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  73. resp, err := client.Get(ctx, key, clientv3.WithPrefix())
  74. cancel()
  75. if err != nil {
  76. log.Printf("get from etcd failed, err:%v\n", err)
  77. return nil, err
  78. }
  79. rs := []string{}
  80. if resp != nil {
  81. for _, v := range resp.Kvs {
  82. rs = append(rs, string(v.Value))
  83. }
  84. }
  85. log.Println("ETCD获取信息:", rs)
  86. return rs, nil
  87. }
  88. type Publisher struct {
  89. key string //
  90. value string //
  91. endpoints []string //etcd集群地址
  92. leaseId clientv3.LeaseID //租约
  93. client *clientv3.Client //链接
  94. stop chan struct{}
  95. }
  96. func NewPublisher(endpoints []string, key, value string) *Publisher {
  97. return &Publisher{
  98. key: key,
  99. value: value,
  100. endpoints: endpoints,
  101. stop: make(chan struct{}),
  102. }
  103. }
  104. func getClient(endpoints []string) (*clientv3.Client, error) {
  105. //创建链接
  106. client, err := clientv3.New(clientv3.Config{
  107. Endpoints: endpoints,
  108. DialTimeout: 5 * time.Second,
  109. })
  110. return client, err
  111. }
  112. // Register 注册服务
  113. func (p *Publisher) Register() (chan struct{}, error) {
  114. client, err := getClient(p.endpoints)
  115. if err != nil {
  116. return p.stop, err
  117. }
  118. //注册节点
  119. err = p.register(client)
  120. if err != nil {
  121. return p.stop, err
  122. }
  123. //续租约
  124. return p.stop, p.keepAliveAsync(client)
  125. }
  126. // register 注册节点
  127. func (p *Publisher) register(cli *clientv3.Client) error {
  128. resp, err := cli.Grant(cli.Ctx(), 5)
  129. if err != nil {
  130. return err
  131. }
  132. p.leaseId = resp.ID
  133. _, err = cli.Put(cli.Ctx(), fmt.Sprintf("%s/%d", p.key, time.Now().UnixNano()), p.value, clientv3.WithLease(p.leaseId))
  134. return err
  135. }
  136. // keepAliveAsync 续租
  137. func (p *Publisher) keepAliveAsync(cli *clientv3.Client) error {
  138. ch, err := cli.KeepAlive(cli.Ctx(), p.leaseId)
  139. if err != nil {
  140. return err
  141. }
  142. go func() {
  143. defer common.Catch()
  144. for {
  145. select {
  146. case <-p.stop:
  147. p.revoke(cli)
  148. log.Println("服务主动关闭")
  149. case _, ok := <-ch:
  150. if !ok {
  151. p.revoke(cli)
  152. if _, err := p.Register(); err != nil {
  153. log.Println("KeepAlive Error", err.Error())
  154. } else {
  155. log.Println("重新创建链接")
  156. }
  157. return
  158. }
  159. }
  160. }
  161. }()
  162. return nil
  163. }
  164. // revoke 撤销给定的租约
  165. func (p *Publisher) revoke(cli *clientv3.Client) {
  166. if _, err := cli.Revoke(cli.Ctx(), p.leaseId); err != nil {
  167. log.Println(err)
  168. }
  169. }