# -*- coding: utf-8 -*- """ Created on 2023-05-11 --------- @summary: 代理池 --------- @author: Dzr """ import random import string import threading from concurrent.futures import ThreadPoolExecutor from threading import Thread from urllib.parse import urlparse import requests import setting as settings from base_server import BaseServer, tools from common.log import logger DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36' def decrypt(input_str: str) -> str: """ 定义base64解密函数 :param input_str: :return: """ # 对前面不是“=”的字节取索引,然后转换为2进制 key = settings.jy_proxy['socks5']['decrypt'] ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '='] output_str = '' # 补齐“=”的个数 equal_num = input_str.count('=') while ascii_list: temp_list = ascii_list[:4] # 转换成2进制字符串 temp_str = ''.join(temp_list) # 对没有8位2进制的字符串补够8位2进制 if len(temp_str) % 8 != 0: temp_str = temp_str[0:-1 * equal_num * 2] # 4个6字节的二进制 转换 为三个8字节的二进制 temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]] # 二进制转为10进制 temp_str_list = [int(x, 2) for x in temp_str_list if x] # 连接成字符串 output_str += ''.join([chr(x) for x in temp_str_list]) ascii_list = ascii_list[4:] return output_str def get_base_url(): return settings.jy_proxy['socks5']['base_url'] def get_netloc(proxy, default=None): proxies = None if isinstance(proxy, dict): proxies = proxy.get('proxies') if isinstance(proxies, str): proxies = tools.json_loads(proxies) # proxies = proxy.get('proxies') if isinstance(proxy, dict) else None if proxies is not None: parser = urlparse(proxies.get('http')) default = parser.netloc return default def get_random(length=4): return ''.join(random.sample(string.ascii_letters + string.digits, length)) def get_host(proxies, scheme): return str(proxies['http']).replace(scheme + '://', '', 1) class BaseProxyPool(BaseServer): def __init__(self, server, redis_label, scheme): super(BaseProxyPool, self).__init__(server=server, label=redis_label) self.scheme = scheme.lower() class ProxyPoolServer(BaseProxyPool, threading.Thread): def __init__(self, service_name, redis_label, scheme): """ 代理池生产管理 @param str service_name: 服务名称 @param str redis_label: redis 标识前缀 @param str scheme: 协议类型 """ threading.Thread.__init__(self) super(ProxyPoolServer, self).__init__(service_name, redis_label, scheme) self.ports = ['8862', '8863'] if self.scheme == 'http' else ['8860', '8861'] self.load_interval = 60 # 访问vps服务的间隔时长 self.tab_name = f'proxy:{self.scheme}:items' self.lst_name = f'proxy:{self.scheme}:pk_0' self.lst_name_1 = f'proxy:{self.scheme}:pk_1' self.lst_name_2 = f'proxy:{self.scheme}:pk_2' def request_proxy(self): proxy_lst = [] for idx, url in enumerate(settings.jy_proxy['socks5']['url']): try: response = requests.get(url, timeout=10) except requests.RequestException as e: logger.error(f'vps服务|请求失败|{url}|原因:{e.args}') else: for item in response.json(): ports = list(filter(lambda p: p in self.ports, item['ports'])) if not ports: continue ip = decrypt(item['ip']) for port in ports: args = (self.scheme, ip, int(port)) proxy = { 'proxies': { 'http': '{}://{}:{}'.format(*args), 'https': '{}://{}:{}'.format(*args) }, 'expire': item['lifetime'], 'pk': idx + 1 } proxy_lst.append(proxy) return proxy_lst def add_proxy(self, proxy): """ 添加代理IP到Redis """ host = get_host(proxy['proxies'], self.scheme) if not self.redis_db.hexists(self.tab_name, host): self.redis_db.rpush(self.lst_name, host) self.redis_db.hset(self.tab_name, host, tools.json_dumps(proxy)) pk = proxy['pk'] if pk == 1: self.redis_db.rpush(self.lst_name_1, host) elif pk == 2: self.redis_db.rpush(self.lst_name_2, host) logger.info(f"添加代理|{host}") def del_proxy(self, proxy): host = get_host(proxy['proxies'], self.scheme) n1 = 0 pk = proxy['pk'] if pk == 1: n1 = self.redis_db.lrem(self.lst_name_1, host, 0) elif pk == 2: n1 = self.redis_db.lrem(self.lst_name_2, host, 0) n0 = self.redis_db.lrem(self.lst_name, host, 0) # 移除所有匹配的元素 self.redis_db.hdel(self.tab_name, host) logger.info(f"移除代理|{host}|{n1 + n0}") def update_proxy(self, proxy, success): """ 更新代理IP的信息 """ if not success: self.del_proxy(proxy) else: self.add_proxy(proxy) def validate_proxy(self, proxy): # url = 'https://myip.ipip.net' url = 'https://www.baidu.com/' proxies = proxy['proxies'] host = str(proxies['http']).replace(self.scheme + '://', '', 1) logger.debug(f"代理检查|{host}") try: request_params = { "headers": {"User-Agent": DEFAULT_UA}, "proxies": proxies, "timeout": 5 } r = requests.get(url, **request_params) return proxy, r.status_code == requests.codes.ok except requests.RequestException: return proxy, False def manage_proxy(self, proxy_lst, workers): """ :param list proxy_lst: 代理ip列表 :param int workers: 工作线程数 """ with ThreadPoolExecutor(max_workers=workers, thread_name_prefix='validate') as executor: fs = executor.map(self.validate_proxy, proxy_lst) for args in fs: self.update_proxy(*args) def _create_pool(self): logger.info('创建Ip池') while True: try: proxy_lst = self.request_proxy() if not proxy_lst: tools.delay(0.5) continue workers = min((int(len(proxy_lst) / 2) or 1), 4) # 最大工作线程数 self.manage_proxy(proxy_lst, workers=workers) tools.delay(self.load_interval) except Exception as e: logger.error('Ip池创建失败') logger.exception(e) def _watch_pool(self): logger.info('Ip池代理监控') while True: try: proxy_dict = self.redis_db.hgetall(self.tab_name) if not proxy_dict: tools.delay(0.5) continue proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()] invalid_proxy_lst = list(filter(lambda proxy: (int(proxy['expire']) - tools.now_ts()) < 30, proxy_lst)) for proxy in invalid_proxy_lst: self.del_proxy(proxy) tools.delay(2) except Exception as e: logger.exception(e) def run(self): _join_lst = [] _thread_lst = [ Thread(target=self._create_pool, daemon=True, name='CreatePool'), Thread(target=self._watch_pool, daemon=True, name='WatchPool') ] for thread in _thread_lst: thread.start() _join_lst.append(thread) for thread in _join_lst: thread.join() class ProxyPoolClient(BaseProxyPool): def __init__(self, service_name, redis_label, scheme): """ @param str service_name: 服务名称 @param str redis_label: 服务名称 @param str scheme: 协议名称 """ super().__init__(service_name, redis_label, scheme) self.tab_name = f'proxy:{self.scheme}:items' self.lst_name = f'proxy:{self.scheme}:pk_0' self.lst_name_1 = f'proxy:{self.scheme}:pk_1' self.lst_name_2 = f'proxy:{self.scheme}:pk_2' def _get_proxy(self, src, dst, timeout=3.0): return self.redis_db.brpoplpush(src, dst, timeout=timeout) def get_proxy(self): """ 从Redis中获取一个代理IP """ return self._get_proxy(self.lst_name, self.lst_name, timeout=0.5) def get_pk1_proxy(self): return self._get_proxy(self.lst_name_1, self.lst_name_1, timeout=0.5) def get_pk2_proxy(self): return self._get_proxy(self.lst_name_2, self.lst_name_2, timeout=0.5) def proxies(self, pk=0): if pk == 1: host = self.get_pk1_proxy() elif pk == 2: host = self.get_pk2_proxy() else: host = self.get_proxy() if not host: return args = (self.scheme, host) r = {'http': '{}://{}'.format(*args), 'https': '{}://{}'.format(*args)} return r def _get_all_proxy(self, pk=None): results = [] proxy_dict = self.redis_db.hgetall(self.tab_name) if not proxy_dict: return results proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()] if pk is not None: proxy_lst = list(filter(lambda p: p['pk'] == pk, proxy_lst)) return proxy_lst def get_proxy_pool(self, **kwargs): results = [] proxy_lst = self._get_all_proxy(**kwargs) for proxy in proxy_lst: life_time = proxy['expire'] expire = life_time - tools.now_ts() results.append({ 'proxies': proxy['proxies'], 'expire_time': tools.ts2dt(life_time), 'expire': expire, 'pk': proxy.get('pk', 1), }) # 展示时按照过期时间从大到小排列 return list(sorted(results, key=lambda x: x['expire'], reverse=True)) def get_all_proxy_ip(self, protocol, **kwargs): return [ proxy['proxies']['http'].replace(f'{protocol}://', '', 1) for proxy in self._get_all_proxy(**kwargs) ]