socks5.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import threading
  2. import time
  3. from collections import deque
  4. from urllib.parse import urlparse
  5. import requests
  6. from config.load import jy_proxy, headers
  7. from utils.log import logger
  8. __all__ = ['Proxy']
  9. def decrypt(input_str: str) -> str:
  10. """
  11. 定义base64解密函数
  12. :param input_str:
  13. :return:
  14. """
  15. # 对前面不是“=”的字节取索引,然后转换为2进制
  16. key = jy_proxy['socks5']['decrypt']
  17. ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '=']
  18. output_str = ''
  19. # 补齐“=”的个数
  20. equal_num = input_str.count('=')
  21. while ascii_list:
  22. temp_list = ascii_list[:4]
  23. # 转换成2进制字符串
  24. temp_str = ''.join(temp_list)
  25. # 对没有8位2进制的字符串补够8位2进制
  26. if len(temp_str) % 8 != 0:
  27. temp_str = temp_str[0:-1 * equal_num * 2]
  28. # 4个6字节的二进制 转换 为三个8字节的二进制
  29. temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]]
  30. # 二进制转为10进制
  31. temp_str_list = [int(x, 2) for x in temp_str_list if x]
  32. # 连接成字符串
  33. output_str += ''.join([chr(x) for x in temp_str_list])
  34. ascii_list = ascii_list[4:]
  35. return output_str
  36. class Socks5Proxy:
  37. __instance = None
  38. def __new__(cls, *args, **kwargs):
  39. if cls.__instance is None:
  40. cls.__instance = super().__new__(cls)
  41. return cls.__instance
  42. def __init__(self):
  43. self.seconds = 60
  44. self._lock = threading.RLock()
  45. self._url = jy_proxy['socks5']['url']
  46. self._dq = deque([])
  47. self._proxies = {}
  48. self._pool = []
  49. self._counter = {}
  50. def _init(self):
  51. while not self._proxies:
  52. if len(self._dq) > 0:
  53. '''队列左边取值'''
  54. self._proxies = self._dq.popleft()
  55. '''添加到队尾'''
  56. self._dq.append(self._proxies)
  57. else:
  58. self.__request_service()
  59. self.__check_proxies()
  60. @property
  61. def proxies(self):
  62. with self._lock:
  63. return self._proxies if len(self._proxies) > 0 else None
  64. def switch(self, reset=False):
  65. with self._lock:
  66. if reset is True:
  67. self.__flush_proxy_pool()
  68. elif len(self._counter) > 0:
  69. end_time = self._counter[self.get_netloc(self._proxies)]
  70. current_time = int(time.time())
  71. if end_time - current_time < self.seconds:
  72. logger.info(f"[移除socks5代理]{self.get_netloc(self._proxies)}")
  73. self._dq.remove(self._proxies)
  74. del self._counter[self.get_netloc(self._proxies)]
  75. logger.info(f"[socks5代理]剩余 {len(self._dq)} 个")
  76. self._proxies = {} # 重置代理
  77. while len(self._proxies) == 0:
  78. if len(self._dq) > 0:
  79. self._proxies = self._dq.popleft()
  80. self._dq.append(self._proxies)
  81. else:
  82. self.__flush_proxy_pool()
  83. @staticmethod
  84. def get_netloc(item: dict):
  85. parser = urlparse(item.get('http'))
  86. return parser.netloc
  87. def __request_service(self):
  88. try:
  89. response = requests.get(self._url, timeout=10)
  90. self.__extract_ip(response)
  91. except requests.RequestException:
  92. pass
  93. def __extract_ip(self, response):
  94. for proxy in response.json():
  95. host = decrypt(proxy['host'])
  96. port = int(proxy['port'])
  97. end_time = proxy['EndTime']
  98. items = {
  99. 'http': 'socks5://{}:{}'.format(host, port),
  100. 'https': 'socks5://{}:{}'.format(host, port)
  101. }
  102. self._pool.append(items)
  103. self._counter.setdefault(self.get_netloc(items), end_time)
  104. def __check_proxies(self):
  105. check_ip = 'https://myip.ipip.net'
  106. logger.info(f"[socks5代理检验]访问地址-{check_ip}")
  107. for proxies in self._pool:
  108. try:
  109. requests_param = {
  110. "headers": headers,
  111. "proxies": proxies,
  112. "timeout": 2
  113. }
  114. requests.get(check_ip, **requests_param)
  115. self._dq.append(proxies)
  116. except requests.RequestException:
  117. del self._counter[self.get_netloc(proxies)]
  118. def __flush_proxy_pool(self):
  119. logger.info(f"[socks5代理]刷新代理池")
  120. self._pool.clear()
  121. self._dq.clear()
  122. self._counter.clear()
  123. self.__request_service()
  124. self.__check_proxies()
  125. def __call__(self, enable_proxy: bool = False, *args, **kwargs):
  126. if enable_proxy:
  127. logger.info("[加载socks5代理]")
  128. self._init()
  129. return self
  130. Proxy = Socks5Proxy()