123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 |
- package etcd
- import (
- "context"
- "fmt"
- "log"
- "math/rand"
- "os"
- "os/signal"
- "syscall"
- "time"
- "app.yhyue.com/moapp/jybase/common"
- "app.yhyue.com/moapp/jybase/iputil"
- clientv3 "go.etcd.io/etcd/client/v3"
- )
- type etcd struct {
- endpoints []string
- }
- func NewEtcd(endpoints ...string) *etcd {
- return &etcd{endpoints: endpoints}
- }
- /*
- * 通过自定义key、value注册服务,程序停止后,自动删除key
- * @param endpoints etcd集群地址
- * @param key 服务的key
- * @param value
- */
- func (e *etcd) Register(kvs ...string) error {
- return e.register(kvs...)
- }
- /*
- * 自动注册ip和端口到etcd中,程序停止后,自动删除key
- * @param endpoints etcd集群地址
- * @param key 服务的key
- * @param port 服务的端口
- */
- func (e *etcd) RegisterIpPort(key, port string) error {
- return e.register(key, fmt.Sprintf("%s:%s", iputil.InternalIp(), port))
- }
- func (e *etcd) register(kvs ...string) error {
- stops := []chan struct{}{}
- var err error
- for i := 0; i < len(kvs); i += 2 {
- stop, e := NewPublisher(e.endpoints, kvs[i], kvs[i+1]).Register()
- if err != nil {
- err = e
- break
- }
- stops = append(stops, stop)
- }
- quit := make(chan os.Signal, 1)
- signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
- <-quit
- for _, stop := range stops {
- stop <- struct{}{}
- }
- return nil
- }
- func (e *etcd) GetOne(key string) string {
- list, err := e.Get(key)
- if err != nil || list == nil || len(list) == 0 {
- return ""
- }
- rander := rand.New(rand.NewSource(time.Now().UnixNano()))
- return list[rander.Intn(len(list))]
- }
- func (e *etcd) Get(key string) ([]string, error) {
- client, err := getClient(e.endpoints)
- if err != nil {
- return nil, err
- }
- defer client.Close()
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- resp, err := client.Get(ctx, key, clientv3.WithPrefix())
- cancel()
- if err != nil {
- log.Printf("get from etcd failed, err:%v\n", err)
- return nil, err
- }
- rs := []string{}
- if resp != nil {
- for _, v := range resp.Kvs {
- rs = append(rs, string(v.Value))
- }
- }
- log.Println("ETCD获取信息:", rs)
- return rs, nil
- }
- type Publisher struct {
- key string //
- value string //
- endpoints []string //etcd集群地址
- leaseId clientv3.LeaseID //租约
- client *clientv3.Client //链接
- stop chan struct{}
- }
- func NewPublisher(endpoints []string, key, value string) *Publisher {
- return &Publisher{
- key: key,
- value: value,
- endpoints: endpoints,
- stop: make(chan struct{}),
- }
- }
- func getClient(endpoints []string) (*clientv3.Client, error) {
- //创建链接
- client, err := clientv3.New(clientv3.Config{
- Endpoints: endpoints,
- DialTimeout: 5 * time.Second,
- })
- return client, err
- }
- // Register 注册服务
- func (p *Publisher) Register() (chan struct{}, error) {
- client, err := getClient(p.endpoints)
- if err != nil {
- return p.stop, err
- }
- //注册节点
- err = p.register(client)
- if err != nil {
- return p.stop, err
- }
- //续租约
- return p.stop, p.keepAliveAsync(client)
- }
- // register 注册节点
- func (p *Publisher) register(cli *clientv3.Client) error {
- resp, err := cli.Grant(cli.Ctx(), 5)
- if err != nil {
- return err
- }
- p.leaseId = resp.ID
- _, err = cli.Put(cli.Ctx(), fmt.Sprintf("%s/%d", p.key, time.Now().UnixNano()), p.value, clientv3.WithLease(p.leaseId))
- return err
- }
- // keepAliveAsync 续租
- func (p *Publisher) keepAliveAsync(cli *clientv3.Client) error {
- ch, err := cli.KeepAlive(cli.Ctx(), p.leaseId)
- if err != nil {
- return err
- }
- go func() {
- defer common.Catch()
- for {
- select {
- case <-p.stop:
- p.revoke(cli)
- log.Println("服务主动关闭")
- case _, ok := <-ch:
- if !ok {
- p.revoke(cli)
- if _, err := p.Register(); err != nil {
- log.Println("KeepAlive Error", err.Error())
- } else {
- log.Println("重新创建链接")
- }
- return
- }
- }
- }
- }()
- return nil
- }
- // revoke 撤销给定的租约
- func (p *Publisher) revoke(cli *clientv3.Client) {
- if _, err := cli.Revoke(cli.Ctx(), p.leaseId); err != nil {
- log.Println(err)
- }
- }
|