# coding:utf-8 import etcd3 import functools import threading import time from loguru import logger class EtcdServer(object): def __init__(self, **kwargs): ''' :param kwargs: ''' etcd_server = kwargs.get("etcd_server") self._etcd_client = etcd3.client(**etcd_server) self._service_prefix = kwargs.get("service_prefix", "").strip() if not self._service_prefix: assert ValueError("服务名称不能为空") self._lease_ttl = kwargs.get("ttl") if kwargs.get("ttl") else 10 port = kwargs.get('server_port') # 服务端口 host = kwargs.get('server_host') # 服务主机 if not (port and host): assert ValueError("主机地址不能为空") self._server_host = f"{host}:{port}" self._online = True # 状态(上线|下线) self._lease_id = "" # 租约ID self._lease_frequency = kwargs.get("lease_frequency") if kwargs.get("lease_frequency") else 0.5 if self._lease_frequency >= 1: assert ValueError("lease_frequency:必须小于1") def __register_url(self): count = 0 while True: try: url = f"{self._service_prefix}{count}" result = self._etcd_client.get(url) if result[1] is not None: count += 1 continue lease_ = self._etcd_client.lease(self._lease_ttl) self._lease_id = lease_.id self._etcd_client.put(url, self._server_host, lease=lease_) return self._server_host, url except Exception as e: time.sleep(2) logger.warning(e) def __register(self): # 上线 while self._online: # 无限制断开连接重试 try: serverHost, servername = self.__register_url() logger.warning(f"\n服务注册成功:\n租约ID:{self._lease_id}\n" f"服务地址:{serverHost}\n服务名称:{servername}\n服务前缀:{self._service_prefix}\n" f"租约时长:{self._lease_ttl}s\n续租频率:{self._lease_ttl * self._lease_frequency}s") while self._online: # 刷新租约时间 lease_result = self._etcd_client.refresh_lease(self._lease_id) now_state = list(lease_result) self._etcd_client.get_lease_info(self._lease_id) time.sleep(self._lease_ttl * self._lease_frequency) except Exception as e: logger.warning(e) time.sleep(3) def destroy(self): ''' 下线 :return: ''' # 下线状态 self._online = False # 释放服务 self._etcd_client.revoke_lease(self._lease_id) def etcd_server(self, func): # 注册进程 @functools.wraps(func) def execute(): register = threading.Thread(target=self.__register) register.start() server = threading.Thread(target=func) server.start() register.join() server.join() return execute