server.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. # coding:utf-8
  2. import etcd3
  3. import functools
  4. import threading
  5. import time
  6. from loguru import logger
  7. class EtcdServer(object):
  8. def __init__(self, **kwargs):
  9. '''
  10. :param kwargs:
  11. '''
  12. etcd_server = kwargs.get("etcd_server")
  13. self._etcd_client = etcd3.client(**etcd_server)
  14. self._service_prefix = kwargs.get("service_prefix", "").strip()
  15. if not self._service_prefix:
  16. assert ValueError("服务名称不能为空")
  17. self._lease_ttl = kwargs.get("ttl") if kwargs.get("ttl") else 10
  18. port = kwargs.get('server_port') # 服务端口
  19. host = kwargs.get('server_host') # 服务主机
  20. if not (port and host):
  21. assert ValueError("主机地址不能为空")
  22. self._server_host = f"{host}:{port}"
  23. self._online = True # 状态(上线|下线)
  24. self._lease_id = "" # 租约ID
  25. self._lease_frequency = kwargs.get("lease_frequency") if kwargs.get("lease_frequency") else 0.5
  26. if self._lease_frequency >= 1:
  27. assert ValueError("lease_frequency:必须小于1")
  28. def __register_url(self):
  29. count = 0
  30. while True:
  31. try:
  32. url = f"{self._service_prefix}{count}"
  33. result = self._etcd_client.get(url)
  34. if result[1] is not None:
  35. count += 1
  36. continue
  37. lease_ = self._etcd_client.lease(self._lease_ttl)
  38. self._lease_id = lease_.id
  39. self._etcd_client.put(url, self._server_host, lease=lease_)
  40. return self._server_host, url
  41. except Exception as e:
  42. time.sleep(2)
  43. logger.warning(e)
  44. def __register(self):
  45. # 上线
  46. while self._online:
  47. # 无限制断开连接重试
  48. try:
  49. serverHost, servername = self.__register_url()
  50. logger.warning(f"\n服务注册成功:\n租约ID:{self._lease_id}\n"
  51. f"服务地址:{serverHost}\n服务名称:{servername}\n服务前缀:{self._service_prefix}\n"
  52. f"租约时长:{self._lease_ttl}s\n续租频率:{self._lease_ttl * self._lease_frequency}s")
  53. while self._online:
  54. # 刷新租约时间
  55. lease_result = self._etcd_client.refresh_lease(self._lease_id)
  56. now_state = list(lease_result)
  57. self._etcd_client.get_lease_info(self._lease_id)
  58. time.sleep(self._lease_ttl * self._lease_frequency)
  59. except Exception as e:
  60. logger.warning(e)
  61. time.sleep(3)
  62. def destroy(self):
  63. '''
  64. 下线
  65. :return:
  66. '''
  67. # 下线状态
  68. self._online = False
  69. # 释放服务
  70. self._etcd_client.revoke_lease(self._lease_id)
  71. def etcd_server(self, func):
  72. # 注册进程
  73. @functools.wraps(func)
  74. def execute():
  75. register = threading.Thread(target=self.__register)
  76. register.start()
  77. server = threading.Thread(target=func)
  78. server.start()
  79. register.join()
  80. server.join()
  81. return execute