# coding:utf-8 import etcd3 import random import functools import time from concurrent.futures import ThreadPoolExecutor from loguru import logger class EtcdClient(object): def __init__(self, **kwargs): etcd_server = kwargs.get("etcd_server") self._etcd_client = etcd3.client(**etcd_server) self._service_prefix = kwargs.get("service_prefix", "") if not self._service_prefix: assert ValueError("服务名称不能为空") self._retries = kwargs.get("retries", 5) self._refresh_time = kwargs.get("refresh_time", 1) self._timeout = kwargs.get("timeout", self._retries * self._refresh_time + 1) def acquire_addr(self, number): """ 随机获取服务地址 :param number: addr数量 :return: """ services = self._etcd_client.get_prefix(self._service_prefix) services = list(services) if services else [] if not services: logger.warning("服务不存在") return [] services = [service[0].decode("utf-8") for service in services if service] choices = random.choices(services, k=number) return choices def start(self, func): """ 单个执行 :param func: :return: """ @functools.wraps(func) def wrapper(*args, **kwargs): param = kwargs.get("param") hosts = [] start_time = time.time() for count in range(self._retries): hosts = self.acquire_addr(1) if hosts or time.time() - start_time > self._timeout: break else: time.sleep(self._refresh_time) hosts = hosts if hosts else "400" return func(param, hosts[0], *args, **kwargs) return wrapper def start_patch(self, thread_number=3): """ # 并行执行任务 :param thread_number:线程数 :return: """ def execute(func): @functools.wraps(func) def wrapper(*args, **kwargs): params = kwargs.get("params") if not params: params = [None] host_count = len(params) hosts = [] start_time = time.time() for count in range(self._retries): hosts = self.acquire_addr(host_count) if hosts or time.time() - start_time > self._timeout: break else: time.sleep(self._refresh_time) if not hosts: hosts = ["400"] * len(params) with ThreadPoolExecutor(max_workers=thread_number) as pool: result = pool.map(func, params, hosts) return list(result) return wrapper return execute