socks5.py 1.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. import threading
  2. import requests
  3. from common.log import logger
  4. from config.load import jy_proxy
  5. __all__ = ['Proxy']
  6. class Socks5Proxy:
  7. def __init__(self):
  8. self._lock = threading.RLock()
  9. self._enable_proxy = False
  10. self._url = jy_proxy['socks5']['url']
  11. self._auth = jy_proxy['socks5']['auth']
  12. self._proxies = None
  13. @property
  14. def proxies(self):
  15. return self._proxies
  16. def switch(self):
  17. with self._lock:
  18. if self._enable_proxy:
  19. self._proxies = self._fetch_proxies()
  20. def _fetch_proxies(self):
  21. _proxy = {}
  22. try:
  23. _proxy = requests.get(self._url, headers=self._auth, timeout=10).json()
  24. finally:
  25. return _proxy.get("data")
  26. def __call__(self, enable_proxy: bool = False, *args, **kwargs):
  27. self._enable_proxy = enable_proxy
  28. if self._enable_proxy:
  29. logger.debug("[socks5代理 - 开启]")
  30. self._proxies = self._fetch_proxies()
  31. return self
  32. Proxy = Socks5Proxy()