import threading import requests from common.log import logger from config.load import jy_proxy __all__ = ['Proxy'] class Socks5Proxy: def __init__(self): self._lock = threading.RLock() self._enable_proxy = False self._url = jy_proxy['socks5']['url'] self._auth = jy_proxy['socks5']['auth'] self._proxies = None @property def proxies(self): return self._proxies def switch(self): with self._lock: if self._enable_proxy: self._proxies = self._fetch_proxies() def _fetch_proxies(self): _proxy = {} try: _proxy = requests.get(self._url, headers=self._auth, timeout=10).json() finally: return _proxy.get("data") def __call__(self, enable_proxy: bool = False, *args, **kwargs): self._enable_proxy = enable_proxy if self._enable_proxy: logger.debug("[socks5代理 - 开启]") self._proxies = self._fetch_proxies() return self Proxy = Socks5Proxy()