# -*- coding: utf-8 -*- """ Created on 2023-05-11 --------- @summary: 代理池 --------- @author: Dzr """ import ast import multiprocessing import random import threading from collections import deque from concurrent.futures import ThreadPoolExecutor, as_completed from operator import itemgetter from urllib.parse import urlparse import requests import setting as settings from base_server import BaseServer, tools from common.log import logger from common.redis_lock import OptimisticLock DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36' def decrypt(input_str: str) -> str: """ 定义base64解密函数 :param input_str: :return: """ # 对前面不是“=”的字节取索引,然后转换为2进制 key = settings.jy_proxy['socks5']['decrypt'] ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '='] output_str = '' # 补齐“=”的个数 equal_num = input_str.count('=') while ascii_list: temp_list = ascii_list[:4] # 转换成2进制字符串 temp_str = ''.join(temp_list) # 对没有8位2进制的字符串补够8位2进制 if len(temp_str) % 8 != 0: temp_str = temp_str[0:-1 * equal_num * 2] # 4个6字节的二进制 转换 为三个8字节的二进制 temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]] # 二进制转为10进制 temp_str_list = [int(x, 2) for x in temp_str_list if x] # 连接成字符串 output_str += ''.join([chr(x) for x in temp_str_list]) ascii_list = ascii_list[4:] return output_str def get_base_url(): return settings.jy_proxy['socks5']['base_url'] def get_netloc(proxy, default=None): proxies = None if isinstance(proxy, dict): proxies = proxy.get('proxies') if isinstance(proxies, str): proxies = tools.json_loads(proxies) # proxies = proxy.get('proxies') if isinstance(proxy, dict) else None if proxies is not None: parser = urlparse(proxies.get('http')) default = parser.netloc return default class BaseProxyPool(BaseServer): def __init__(self, name, redis_label, scheme): super(BaseProxyPool, self).__init__(server=name, label=redis_label) self.scheme = scheme.lower() self.proxy_name = self.scheme + self.server self.proxy_queue = f'{redis_label}_{self.scheme}' self.unique_key = ('ip', 'port') # 组合 proxy 指纹的字段名称 def get_redis_name(self, proxy): return f"{self.proxy_queue}_{proxy['fingerprint']}" def str_scan(self, pattern, count=1000): cursor = '0' while True: cursor, keys = self.redis_db.scan(cursor, pattern, count) if len(keys) > 0: yield from keys if cursor == 0: break def get_redis_name_lst(self, pattern='*'): results = [] pattern = self.proxy_queue + pattern for key in self.str_scan(pattern, count=5000): results.append(key) return results def get_proxy(self, name): items = self.redis_db.hgetall(name) if items is None or 'proxies' not in items: return None proxy = { 'proxies': ast.literal_eval(items['proxies']), 'fingerprint': items['fingerprint'], 'start_time': int(items['start_time']), 'end_time': int(items['end_time']), 'last_time': int(items['last_time']), 'usage': int(items['usage']), 'pk': int(items.get('pk', 1)) } return proxy def get(self, name, key): return self.redis_db.hget(name, key) def exists(self, proxy): return self.redis_db.exists(self.get_redis_name(proxy)) def check(self, proxy): is_ok = False # url = 'https://myip.ipip.net' url = 'https://www.baidu.com/' netloc = get_netloc(proxy) try: requests_param = { "headers": {'User-Agent': DEFAULT_UA}, "proxies": proxy['proxies'], "timeout": 5 } requests.get(url, **requests_param) is_ok = True except requests.RequestException: pass msg = "正常" if is_ok else "失效" logger.debug(f"[{self.proxy_name}]检查代理Ip - {netloc} --通信{msg}") return proxy, is_ok def remove_proxy(self, proxy): netloc = get_netloc(proxy) logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --删除") if self.exists(proxy): self.redis_db.delete(self.get_redis_name(proxy)) def add_proxy(self, proxy): netloc = get_netloc(proxy) logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --添加") if not self.exists(proxy): redis_name = self.get_redis_name(proxy) self.redis_db.hset(redis_name, None, None, mapping=proxy) expire_ts = proxy['end_time'] - tools.now_ts() self.redis_db.expire(redis_name, expire_ts) class ProxyPoolServer(BaseProxyPool, threading.Thread): def __init__(self, name, redis_label, scheme): """ 代理池生产管理 @param str name: 服务名称 @param str redis_label: redis 标识前缀 @param str scheme: 协议类型 """ threading.Thread.__init__(self) super(ProxyPoolServer, self).__init__(name, redis_label, scheme) self.label = f'{self.proxy_name}_{self.getName()}' self.ports = ['8862', '8863'] if self.scheme == 'http' else ['8860', '8861'] self.load_interval = 60 # 轮询访问vps代理服务的时间间隔 def remove_failure_proxy(self, proxy_lst): """删除失效/故障代理ip""" logger.info(f"[{self.label}]清除无效代理Ip") proxy_fingerprints = set([proxy['fingerprint'] for proxy in proxy_lst]) for redis_name in self.get_redis_name_lst(): fingerprint = self.get(redis_name, 'fingerprint') if fingerprint not in proxy_fingerprints: self.redis_db.delete(redis_name) def request_proxy(self): logger.info(f"[{self.label}]请求vps服务") proxy_lst = [] for idx, url in enumerate(settings.jy_proxy['socks5']['url']): try: response = requests.get(url, timeout=10) for item in response.json(): ports = list(filter(lambda p: p in self.ports, item['ports'])) if not ports: continue ip = decrypt(item['ip']) port = int(ports[random.randint(0, len(ports) - 1)]) start_time = tools.now_ts() end_time = item['lifetime'] if end_time - start_time > 0: proxy = { 'proxies': { 'http': '{}://{}:{}'.format(self.scheme, ip, port), 'https': '{}://{}:{}'.format(self.scheme, ip, port) }, 'fingerprint': self.fingerprint(ip=ip, port=port), 'start_time': start_time, 'end_time': end_time, 'last_time': 0, 'usage': 0, 'pk': idx + 1 } proxy_lst.append(proxy) except Exception as e: logger.error(f'[{self.label}]vps服务访问异常[{url}],原因:{e.args}') return proxy_lst def manage_proxy(self, proxy_lst: list, workers=1): self.remove_failure_proxy(proxy_lst) with ThreadPoolExecutor(max_workers=workers) as Executor: fs = [Executor.submit(self.check, proxy) for proxy in proxy_lst] for f in as_completed(fs): proxy, is_ok = f.result() if is_ok: self.add_proxy(proxy) else: self.remove_proxy(proxy) def run(self): logger.info(f'[{self.label}]开始生产代理Ip') while True: try: proxy_lst = self.request_proxy() if not proxy_lst: tools.delay(2) continue dynamic_workers = min((int(len(proxy_lst) / 2) or 1), 10) self.manage_proxy(proxy_lst, workers=dynamic_workers) # 线程池上限10 tools.delay(self.load_interval) except Exception as e: logger.exception(e) class ProxyPoolClient(BaseProxyPool): def __init__(self, name: str, redis_label: str, scheme: str): """ 调用代理池 """ super(ProxyPoolClient, self).__init__(name, redis_label, scheme) current_process = multiprocessing.current_process() sub_name = f'{tools.get_localhost_ip()}:{current_process.pid}' self.lock_label = f'{redis_label}:{sub_name}' def proxy_total(self): return len(self.get_redis_name_lst()) def get_all_proxy(self, pk=None): proxy_lst = deque([]) for redis_name in self.get_redis_name_lst(): proxy = self.get_proxy(redis_name) if isinstance(proxy, dict): proxy_lst.append(proxy) if len(proxy_lst) > 0: if pk is not None: '''先按照代理类型(pk)过滤,再按照使用次数进行升序排列''' special = deque(filter(lambda x: x['pk'] == int(pk), proxy_lst)) proxy_lst = deque(sorted(special, key=itemgetter('usage'))) else: '''按照使用次数进行升序排列(左小右大)''' proxy_lst = deque(sorted(proxy_lst, key=itemgetter('usage'))) return proxy_lst def get_proxy_pool(self, **kwargs): proxy_lst = [] for proxy in self.get_all_proxy(**kwargs): last_time = proxy['last_time'] end_time = proxy['end_time'] expire = end_time - tools.now_ts() proxy_lst.append({ 'proxies': proxy['proxies'], 'start_time': tools.ts2dt(proxy['start_time']), 'end_time': tools.ts2dt(end_time), 'last_time': tools.ts2dt(last_time) if last_time != 0 else '', 'expire': expire, 'usage': proxy['usage'], 'pk': proxy.get('pk', 1), }) # 展示时按照过期时间从大到小排列 return list(sorted(proxy_lst, key=lambda x: x['expire'], reverse=True)) def get_all_proxy_ip(self, protocol, **kwargs): return [ proxy['proxies']['http'].replace(f'{protocol}://', '') for proxy in self.get_all_proxy(**kwargs) ] def proxies(self, **kwargs): with OptimisticLock(self.redis_db, self.lock_label): proxy = {} if self.proxy_total() > 0: proxy_lst = self.get_all_proxy(**kwargs) proxy = proxy_lst.popleft() name = self.get_redis_name(proxy) mapping = { 'usage': proxy['usage'] + 1, 'last_time': tools.now_ts() } self.redis_db.hset(name, None, None, mapping) return proxy.get('proxies')