watcher.go 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package node
  2. import (
  3. "context"
  4. client "go.etcd.io/etcd/client/v3"
  5. "log"
  6. )
  7. // Resolver 需要获取注册服务内容结构体需要实现的方法
  8. type Resolver interface {
  9. // AddNode 添加节点
  10. AddNode(code, server string) error
  11. // DelNode 删除节点
  12. DelNode(code, server string) error
  13. }
  14. // NewWatcher 获取节点内容
  15. func (n *Node) NewWatcher(ctx context.Context, resolver Resolver) {
  16. // 获取当前已注册的服务
  17. registered, err := n.Client.Get(ctx, n.scheme, client.WithPrefix())
  18. if err != nil {
  19. log.Printf("NewWatcher 获取已注册服务出错%v", err)
  20. }
  21. for _, value := range registered.Kvs {
  22. serverCode, serverAddr := getServerCodeFromKey(string(value.Key))
  23. if addRegisteredErr := resolver.AddNode(serverCode, serverAddr); addRegisteredErr != nil {
  24. log.Println("NewWatcher 添加已存在节点 %s:%s 异常%v", serverCode, serverAddr, addRegisteredErr)
  25. }
  26. }
  27. // 持续监听服务变化
  28. for wch := range n.Client.Watch(ctx, n.scheme, client.WithPrefix()) {
  29. for _, ev := range wch.Events {
  30. //获取服务标识代码及服务地址
  31. serverCode, serverAddr := getServerCodeFromKey(string(ev.Kv.Key))
  32. var operateErr error
  33. switch ev.Type {
  34. case client.EventTypePut:
  35. operateErr = resolver.AddNode(serverCode, serverAddr)
  36. case client.EventTypeDelete:
  37. operateErr = resolver.DelNode(serverCode, serverAddr)
  38. }
  39. if operateErr != nil {
  40. log.Printf("NewWatcher 监听 %v 节点 %s:%s 异常%v", ev.Type, serverCode, serverAddr, operateErr)
  41. }
  42. }
  43. }
  44. }