123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- # coding:utf-8
- import etcd3
- import functools
- import threading
- import time
- from loguru import logger
- class EtcdServer(object):
- def __init__(self, **kwargs):
- '''
- :param kwargs:
- '''
- etcd_server = kwargs.get("etcd_server")
- self._etcd_client = etcd3.client(**etcd_server)
- self._service_prefix = kwargs.get("service_prefix", "").strip()
- if not self._service_prefix:
- assert ValueError("服务名称不能为空")
- self._lease_ttl = kwargs.get("ttl") if kwargs.get("ttl") else 10
- port = kwargs.get('server_port') # 服务端口
- host = kwargs.get('server_host') # 服务主机
- if not (port and host):
- assert ValueError("主机地址不能为空")
- self._server_host = f"{host}:{port}"
- self._online = True # 状态(上线|下线)
- self._lease_id = "" # 租约ID
- self._lease_frequency = kwargs.get("lease_frequency") if kwargs.get("lease_frequency") else 0.5
- if self._lease_frequency >= 1:
- assert ValueError("lease_frequency:必须小于1")
- def __register_url(self):
- count = 0
- while True:
- try:
- url = f"{self._service_prefix}{count}"
- result = self._etcd_client.get(url)
- if result[1] is not None:
- count += 1
- continue
- lease_ = self._etcd_client.lease(self._lease_ttl)
- self._lease_id = lease_.id
- self._etcd_client.put(url, self._server_host, lease=lease_)
- return self._server_host, url
- except Exception as e:
- time.sleep(2)
- logger.warning(e)
- def __register(self):
- # 上线
- while self._online:
- # 无限制断开连接重试
- try:
- serverHost, servername = self.__register_url()
- logger.warning(f"\n服务注册成功:\n租约ID:{self._lease_id}\n"
- f"服务地址:{serverHost}\n服务名称:{servername}\n服务前缀:{self._service_prefix}\n"
- f"租约时长:{self._lease_ttl}s\n续租频率:{self._lease_ttl * self._lease_frequency}s")
- while self._online:
- # 刷新租约时间
- lease_result = self._etcd_client.refresh_lease(self._lease_id)
- now_state = list(lease_result)
- self._etcd_client.get_lease_info(self._lease_id)
- time.sleep(self._lease_ttl * self._lease_frequency)
- except Exception as e:
- logger.warning(e)
- time.sleep(3)
- def destroy(self):
- '''
- 下线
- :return:
- '''
- # 下线状态
- self._online = False
- # 释放服务
- self._etcd_client.revoke_lease(self._lease_id)
- def etcd_server(self, func):
- # 注册进程
- @functools.wraps(func)
- def execute():
- register = threading.Thread(target=self.__register)
- register.start()
- server = threading.Thread(target=func)
- server.start()
- register.join()
- server.join()
- return execute
|