1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889 |
- # coding:utf-8
- import etcd3
- import random
- import functools
- import time
- from concurrent.futures import ThreadPoolExecutor
- from loguru import logger
- class EtcdClient(object):
- def __init__(self, **kwargs):
- etcd_server = kwargs.get("etcd_server")
- self._etcd_client = etcd3.client(**etcd_server)
- self._service_prefix = kwargs.get("service_prefix", "")
- if not self._service_prefix:
- assert ValueError("服务名称不能为空")
- self._retries = kwargs.get("retries", 5)
- self._refresh_time = kwargs.get("refresh_time", 1)
- self._timeout = kwargs.get("timeout", self._retries * self._refresh_time + 1)
- def acquire_addr(self, number):
- """
- 随机获取服务地址
- :param number: addr数量
- :return:
- """
- services = self._etcd_client.get_prefix(self._service_prefix)
- services = list(services) if services else []
- if not services:
- logger.warning("服务不存在")
- return []
- services = [service[0].decode("utf-8") for service in services if service]
- choices = random.choices(services, k=number)
- return choices
- def start(self, func):
- """
- 单个执行
- :param func:
- :return:
- """
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- param = kwargs.get("param")
- hosts = []
- start_time = time.time()
- for count in range(self._retries):
- hosts = self.acquire_addr(1)
- if hosts or time.time() - start_time > self._timeout:
- break
- else:
- time.sleep(self._refresh_time)
- hosts = hosts if hosts else "400"
- return func(param, hosts[0], *args, **kwargs)
- return wrapper
- def start_patch(self, thread_number=3):
- """
- # 并行执行任务
- :param thread_number:线程数
- :return:
- """
- def execute(func):
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- params = kwargs.get("params")
- if not params:
- params = [None]
- host_count = len(params)
- hosts = []
- start_time = time.time()
- for count in range(self._retries):
- hosts = self.acquire_addr(host_count)
- if hosts or time.time() - start_time > self._timeout:
- break
- else:
- time.sleep(self._refresh_time)
- if not hosts:
- hosts = ["400"] * len(params)
- with ThreadPoolExecutor(max_workers=thread_number) as pool:
- result = pool.map(func, params, hosts)
- return list(result)
- return wrapper
- return execute
|