123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- import copy
- import time
- import requests
- from config.load import jy_proxy, headers
- from utils.log import logger
- __all__ = ['Proxy']
- def decrypt(input_str: str) -> str:
- """
- 定义base64解密函数
- :param input_str:
- :return:
- """
- # 对前面不是“=”的字节取索引,然后转换为2进制
- key = jy_proxy['socks5']['decrypt']
- ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '=']
- output_str = ''
- # 补齐“=”的个数
- equal_num = input_str.count('=')
- while ascii_list:
- temp_list = ascii_list[:4]
- # 转换成2进制字符串
- temp_str = ''.join(temp_list)
- # 对没有8位2进制的字符串补够8位2进制
- if len(temp_str) % 8 != 0:
- temp_str = temp_str[0:-1 * equal_num * 2]
- # 4个6字节的二进制 转换 为三个8字节的二进制
- temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]]
- # 二进制转为10进制
- temp_str_list = [int(x, 2) for x in temp_str_list if x]
- # 连接成字符串
- output_str += ''.join([chr(x) for x in temp_str_list])
- ascii_list = ascii_list[4:]
- return output_str
- class Socks5Proxy:
- __instance = None
- def __new__(cls, *args, **kwargs):
- if cls.__instance is None:
- cls.__instance = super().__new__(cls)
- return cls.__instance
- def __init__(self):
- self.__proxies = {}
- def _init(self):
- self.url = jy_proxy['socks5']['url']
- self.pool = []
- self.index = 0 # 当前代理在代理池的位置
- self.counter = {}
- self.seconds = 60
- while not self.__proxies:
- if len(self.pool) > 0 and not self.__proxies:
- self.__proxies = copy.deepcopy(self.pool[self.index])
- else:
- self.generate_pool()
- @property
- def proxies(self):
- return self.__proxies
- def switch(self, reset=False):
- """切换代理"""
- if reset is True:
- self.counter.clear()
- self.flush_pool()
- elif len(self.counter) > 0:
- end_time = self.counter[str(self.__proxies)]
- current_time = int(time.time())
- if end_time - current_time < self.seconds:
- self.pool.remove(self.__proxies)
- logger.info(f"[代理]移除:{self.__proxies}")
- del self.counter[str(self.__proxies)]
- logger.info(f"[代理]剩余个数:{len(self.pool)}")
- self.__proxies = {} # 重置代理
- while not self.proxies:
- if len(self.pool) > 0:
- self.index += 1
- if self.index >= len(self.pool):
- self.index = 0
- self.__proxies = copy.deepcopy(self.pool[self.index])
- logger.info(f"[代理]切换 - {self.index}")
- else:
- logger.info("[代理]无可用代理")
- self.flush_pool()
- def generate_pool(self):
- """初始化代理池"""
- self.__socks5()
- self.__check_proxies()
- def flush_pool(self):
- logger.info(f"[代理]刷新代理池")
- self.pool.clear()
- self.generate_pool()
- def __socks5(self):
- logger.info(f"[代理]请求服务:{self.url}")
- try:
- response = requests.get(self.url, timeout=10)
- self.__extract_ip(response)
- except requests.RequestException:
- pass
- def __extract_ip(self, response):
- for proxy in response.json():
- host = decrypt(proxy['host'])
- port = int(proxy['port'])
- end_time = proxy['EndTime']
- items = {
- 'http': 'socks5://{}:{}'.format(host, port),
- 'https': 'socks5://{}:{}'.format(host, port)
- }
- self.pool.append(items)
- self.counter.setdefault(str(items), end_time)
- def __check_proxies(self):
- check_ip = 'https://myip.ipip.net'
- logger.info(f"[代理]通信检查:{check_ip}")
- for proxies in self.pool[::-1]:
- try:
- requests_param = {
- "headers": headers,
- "proxies": proxies,
- "timeout": 10
- }
- requests.get(check_ip, **requests_param)
- except requests.RequestException:
- self.pool.remove(proxies)
- del self.counter[str(proxies)]
- logger.info(f"[代理]可用个数:{len(self.pool)}")
- def __call__(self, enable_proxy: bool = False, *args, **kwargs):
- if enable_proxy:
- self._init()
- return self
- Proxy = Socks5Proxy()
|