1234567891011121314151617181920212223242526272829303132333435363738394041424344 |
- import threading
- import requests
- from common.log import logger
- from config.load import jy_proxy
- __all__ = ['Proxy']
- class Socks5Proxy:
- def __init__(self):
- self._lock = threading.RLock()
- self._enable_proxy = False
- self._url = jy_proxy['socks5']['url']
- self._auth = jy_proxy['socks5']['auth']
- self._proxies = None
- @property
- def proxies(self):
- return self._proxies
- def switch(self):
- with self._lock:
- if self._enable_proxy:
- self._proxies = self._fetch_proxies()
- def _fetch_proxies(self):
- _proxy = {}
- try:
- _proxy = requests.get(self._url, headers=self._auth, timeout=10).json()
- finally:
- return _proxy.get("data")
- def __call__(self, enable_proxy: bool = False, *args, **kwargs):
- self._enable_proxy = enable_proxy
- if self._enable_proxy:
- logger.debug("[socks5代理 - 开启]")
- self._proxies = self._fetch_proxies()
- return self
- Proxy = Socks5Proxy()
|