client.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # coding:utf-8
  2. import etcd3
  3. import random
  4. import functools
  5. import time
  6. from concurrent.futures import ThreadPoolExecutor
  7. from loguru import logger
  8. class EtcdClient(object):
  9. def __init__(self, **kwargs):
  10. etcd_server = kwargs.get("etcd_server")
  11. self._etcd_client = etcd3.client(**etcd_server)
  12. self._service_prefix = kwargs.get("service_prefix", "")
  13. if not self._service_prefix:
  14. assert ValueError("服务名称不能为空")
  15. self._retries = kwargs.get("retries", 5)
  16. self._refresh_time = kwargs.get("refresh_time", 1)
  17. self._timeout = kwargs.get("timeout", self._retries * self._refresh_time + 1)
  18. def acquire_addr(self, number):
  19. """
  20. 随机获取服务地址
  21. :param number: addr数量
  22. :return:
  23. """
  24. services = self._etcd_client.get_prefix(self._service_prefix)
  25. services = list(services) if services else []
  26. if not services:
  27. logger.warning("服务不存在")
  28. return []
  29. services = [service[0].decode("utf-8") for service in services if service]
  30. choices = random.choices(services, k=number)
  31. return choices
  32. def start(self, func):
  33. """
  34. 单个执行
  35. :param func:
  36. :return:
  37. """
  38. @functools.wraps(func)
  39. def wrapper(*args, **kwargs):
  40. param = kwargs.get("param")
  41. hosts = []
  42. start_time = time.time()
  43. for count in range(self._retries):
  44. hosts = self.acquire_addr(1)
  45. if hosts or time.time() - start_time > self._timeout:
  46. break
  47. else:
  48. time.sleep(self._refresh_time)
  49. hosts = hosts if hosts else "400"
  50. return func(param, hosts[0], *args, **kwargs)
  51. return wrapper
  52. def start_patch(self, thread_number=3):
  53. """
  54. # 并行执行任务
  55. :param thread_number:线程数
  56. :return:
  57. """
  58. def execute(func):
  59. @functools.wraps(func)
  60. def wrapper(*args, **kwargs):
  61. params = kwargs.get("params")
  62. if not params:
  63. params = [None]
  64. host_count = len(params)
  65. hosts = []
  66. start_time = time.time()
  67. for count in range(self._retries):
  68. hosts = self.acquire_addr(host_count)
  69. if hosts or time.time() - start_time > self._timeout:
  70. break
  71. else:
  72. time.sleep(self._refresh_time)
  73. if not hosts:
  74. hosts = ["400"] * len(params)
  75. with ThreadPoolExecutor(max_workers=thread_number) as pool:
  76. result = pool.map(func, params, hosts)
  77. return list(result)
  78. return wrapper
  79. return execute