import threading import time from collections import deque from urllib.parse import urlparse import requests from config.load import jy_proxy, headers from utils.log import logger __all__ = ['Proxy'] def decrypt(input_str: str) -> str: """ 定义base64解密函数 :param input_str: :return: """ # 对前面不是“=”的字节取索引,然后转换为2进制 key = jy_proxy['socks5']['decrypt'] ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '='] output_str = '' # 补齐“=”的个数 equal_num = input_str.count('=') while ascii_list: temp_list = ascii_list[:4] # 转换成2进制字符串 temp_str = ''.join(temp_list) # 对没有8位2进制的字符串补够8位2进制 if len(temp_str) % 8 != 0: temp_str = temp_str[0:-1 * equal_num * 2] # 4个6字节的二进制 转换 为三个8字节的二进制 temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]] # 二进制转为10进制 temp_str_list = [int(x, 2) for x in temp_str_list if x] # 连接成字符串 output_str += ''.join([chr(x) for x in temp_str_list]) ascii_list = ascii_list[4:] return output_str class Socks5Proxy: __instance = None def __new__(cls, *args, **kwargs): if cls.__instance is None: cls.__instance = super().__new__(cls) return cls.__instance def __init__(self): self.seconds = 60 self._lock = threading.RLock() self._url = jy_proxy['socks5']['url'] self._dq = deque([]) self._proxies = {} self._pool = [] self._counter = {} def _init(self): while not self._proxies: if len(self._dq) > 0: '''队列左边取值''' self._proxies = self._dq.popleft() '''添加到队尾''' self._dq.append(self._proxies) else: self.__request_service() self.__check_proxies() @property def proxies(self): with self._lock: return self._proxies if len(self._proxies) > 0 else None def switch(self, reset=False): with self._lock: if reset is True: self.__flush_proxy_pool() elif len(self._counter) > 0: end_time = self._counter[self.get_netloc(self._proxies)] current_time = int(time.time()) if end_time - current_time < self.seconds: logger.info(f"[移除socks5代理]{self.get_netloc(self._proxies)}") self._dq.remove(self._proxies) del self._counter[self.get_netloc(self._proxies)] logger.info(f"[socks5代理]剩余 {len(self._dq)} 个") self._proxies = {} # 重置代理 while len(self._proxies) == 0: if len(self._dq) > 0: self._proxies = self._dq.popleft() self._dq.append(self._proxies) else: self.__flush_proxy_pool() @staticmethod def get_netloc(item: dict): parser = urlparse(item.get('http')) return parser.netloc def __request_service(self): try: response = requests.get(self._url, timeout=10) self.__extract_ip(response) except requests.RequestException: pass def __extract_ip(self, response): for proxy in response.json(): host = decrypt(proxy['host']) port = int(proxy['port']) end_time = proxy['EndTime'] items = { 'http': 'socks5://{}:{}'.format(host, port), 'https': 'socks5://{}:{}'.format(host, port) } self._pool.append(items) self._counter.setdefault(self.get_netloc(items), end_time) def __check_proxies(self): check_ip = 'https://myip.ipip.net' logger.info(f"[socks5代理检验]访问地址-{check_ip}") for proxies in self._pool: try: requests_param = { "headers": headers, "proxies": proxies, "timeout": 2 } requests.get(check_ip, **requests_param) self._dq.append(proxies) except requests.RequestException: del self._counter[self.get_netloc(proxies)] def __flush_proxy_pool(self): logger.info(f"[socks5代理]刷新代理池") self._pool.clear() self._dq.clear() self._counter.clear() self.__request_service() self.__check_proxies() def __call__(self, enable_proxy: bool = False, *args, **kwargs): if enable_proxy: logger.info("[加载socks5代理]") self._init() return self Proxy = Socks5Proxy()