12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- package node
- import (
- "context"
- client "go.etcd.io/etcd/client/v3"
- "log"
- )
- // Resolver 需要获取注册服务内容结构体需要实现的方法
- type Resolver interface {
- // AddNode 添加节点
- AddNode(code, server string) error
- // DelNode 删除节点
- DelNode(code, server string) error
- }
- // NewWatcher 获取节点内容
- func (n *Node) NewWatcher(ctx context.Context, resolver Resolver) {
- // 获取当前已注册的服务
- registered, err := n.Client.Get(ctx, n.scheme, client.WithPrefix())
- if err != nil {
- log.Printf("NewWatcher 获取已注册服务出错%v", err)
- }
- for _, value := range registered.Kvs {
- serverCode, serverAddr := getServerCodeFromKey(string(value.Key))
- if addRegisteredErr := resolver.AddNode(serverCode, serverAddr); addRegisteredErr != nil {
- log.Println("NewWatcher 添加已存在节点 %s:%s 异常%v", serverCode, serverAddr, addRegisteredErr)
- }
- }
- // 持续监听服务变化
- for wch := range n.Client.Watch(ctx, n.scheme, client.WithPrefix()) {
- for _, ev := range wch.Events {
- //获取服务标识代码及服务地址
- serverCode, serverAddr := getServerCodeFromKey(string(ev.Kv.Key))
- var operateErr error
- switch ev.Type {
- case client.EventTypePut:
- operateErr = resolver.AddNode(serverCode, serverAddr)
- case client.EventTypeDelete:
- operateErr = resolver.DelNode(serverCode, serverAddr)
- }
- if operateErr != nil {
- log.Printf("NewWatcher 监听 %v 节点 %s:%s 异常%v", ev.Type, serverCode, serverAddr, operateErr)
- }
- }
- }
- }
|