123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-05-11
- ---------
- @summary: 代理池
- ---------
- @author: Dzr
- """
- import ast
- import multiprocessing
- import random
- import threading
- from collections import deque
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from operator import itemgetter
- from urllib.parse import urlparse
- import requests
- import setting as settings
- from base_server import BaseServer, tools
- from common.log import logger
- from common.redis_lock import OptimisticLock
- DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36'
- def decrypt(input_str: str) -> str:
- """
- 定义base64解密函数
- :param input_str:
- :return:
- """
- # 对前面不是“=”的字节取索引,然后转换为2进制
- key = settings.jy_proxy['socks5']['decrypt']
- ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '=']
- output_str = ''
- # 补齐“=”的个数
- equal_num = input_str.count('=')
- while ascii_list:
- temp_list = ascii_list[:4]
- # 转换成2进制字符串
- temp_str = ''.join(temp_list)
- # 对没有8位2进制的字符串补够8位2进制
- if len(temp_str) % 8 != 0:
- temp_str = temp_str[0:-1 * equal_num * 2]
- # 4个6字节的二进制 转换 为三个8字节的二进制
- temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]]
- # 二进制转为10进制
- temp_str_list = [int(x, 2) for x in temp_str_list if x]
- # 连接成字符串
- output_str += ''.join([chr(x) for x in temp_str_list])
- ascii_list = ascii_list[4:]
- return output_str
- def get_base_url():
- return settings.jy_proxy['socks5']['base_url']
- def get_netloc(proxy, default=None):
- proxies = None
- if isinstance(proxy, dict):
- proxies = proxy.get('proxies')
- if isinstance(proxies, str):
- proxies = tools.json_loads(proxies)
- # proxies = proxy.get('proxies') if isinstance(proxy, dict) else None
- if proxies is not None:
- parser = urlparse(proxies.get('http'))
- default = parser.netloc
- return default
- class BaseProxyPool(BaseServer):
- def __init__(self, name, redis_label, scheme):
- super(BaseProxyPool, self).__init__(server=name, label=redis_label)
- self.scheme = scheme.lower()
- self.proxy_name = self.scheme + self.server
- self.proxy_queue = f'{redis_label}_{self.scheme}'
- self.unique_key = ('ip', 'port') # 组合 proxy 指纹的字段名称
- def get_redis_name(self, proxy):
- return f"{self.proxy_queue}_{proxy['fingerprint']}"
- def str_scan(self, pattern, count=1000):
- cursor = '0'
- while True:
- cursor, keys = self.redis_db.scan(cursor, pattern, count)
- if len(keys) > 0:
- yield from keys
- if cursor == 0:
- break
- def get_redis_name_lst(self, pattern='*'):
- results = []
- pattern = self.proxy_queue + pattern
- for key in self.str_scan(pattern, count=5000):
- results.append(key)
- return results
- def get_proxy(self, name):
- items = self.redis_db.hgetall(name)
- if items is None or 'proxies' not in items:
- return None
- proxy = {
- 'proxies': ast.literal_eval(items['proxies']),
- 'fingerprint': items['fingerprint'],
- 'start_time': int(items['start_time']),
- 'end_time': int(items['end_time']),
- 'last_time': int(items['last_time']),
- 'usage': int(items['usage']),
- 'pk': int(items.get('pk', 1))
- }
- return proxy
- def get(self, name, key):
- return self.redis_db.hget(name, key)
- def exists(self, proxy):
- return self.redis_db.exists(self.get_redis_name(proxy))
- def check(self, proxy):
- is_ok = False
- # url = 'https://myip.ipip.net'
- url = 'https://www.baidu.com/'
- netloc = get_netloc(proxy)
- try:
- requests_param = {
- "headers": {'User-Agent': DEFAULT_UA},
- "proxies": proxy['proxies'],
- "timeout": 5
- }
- requests.get(url, **requests_param)
- is_ok = True
- except requests.RequestException:
- pass
- msg = "正常" if is_ok else "失效"
- logger.debug(f"[{self.proxy_name}]检查代理Ip - {netloc} --通信{msg}")
- return proxy, is_ok
- def remove_proxy(self, proxy):
- netloc = get_netloc(proxy)
- logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --删除")
- if self.exists(proxy):
- self.redis_db.delete(self.get_redis_name(proxy))
- def add_proxy(self, proxy):
- netloc = get_netloc(proxy)
- logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --添加")
- if not self.exists(proxy):
- redis_name = self.get_redis_name(proxy)
- self.redis_db.hset(redis_name, None, None, mapping=proxy)
- expire_ts = proxy['end_time'] - tools.now_ts()
- self.redis_db.expire(redis_name, expire_ts)
- class ProxyPoolServer(BaseProxyPool, threading.Thread):
- def __init__(self, name, redis_label, scheme):
- """
- 代理池生产管理
- @param str name: 服务名称
- @param str redis_label: redis 标识前缀
- @param str scheme: 协议类型
- """
- threading.Thread.__init__(self)
- super(ProxyPoolServer, self).__init__(name, redis_label, scheme)
- self.label = f'{self.proxy_name}_{self.getName()}'
- self.ports = ['8862', '8863'] if self.scheme == 'http' else ['8860', '8861']
- self.load_interval = 60 # 轮询访问vps代理服务的时间间隔
- def remove_failure_proxy(self, proxy_lst):
- """删除失效/故障代理ip"""
- logger.info(f"[{self.label}]清除无效代理Ip")
- proxy_fingerprints = set([proxy['fingerprint'] for proxy in proxy_lst])
- for redis_name in self.get_redis_name_lst():
- fingerprint = self.get(redis_name, 'fingerprint')
- if fingerprint not in proxy_fingerprints:
- self.redis_db.delete(redis_name)
- def request_proxy(self):
- logger.info(f"[{self.label}]请求vps服务")
- proxy_lst = []
- for idx, url in enumerate(settings.jy_proxy['socks5']['url']):
- try:
- response = requests.get(url, timeout=10)
- for item in response.json():
- ports = list(filter(lambda p: p in self.ports, item['ports']))
- if not ports:
- continue
- ip = decrypt(item['ip'])
- port = int(ports[random.randint(0, len(ports) - 1)])
- start_time = tools.now_ts()
- end_time = item['lifetime']
- if end_time - start_time > 0:
- proxy = {
- 'proxies': {
- 'http': '{}://{}:{}'.format(self.scheme, ip, port),
- 'https': '{}://{}:{}'.format(self.scheme, ip, port)
- },
- 'fingerprint': self.fingerprint(ip=ip, port=port),
- 'start_time': start_time,
- 'end_time': end_time,
- 'last_time': 0,
- 'usage': 0,
- 'pk': idx + 1
- }
- proxy_lst.append(proxy)
- except Exception as e:
- logger.error(f'[{self.label}]vps服务访问异常[{url}],原因:{e.args}')
- return proxy_lst
- def manage_proxy(self, proxy_lst: list, workers=1):
- self.remove_failure_proxy(proxy_lst)
- with ThreadPoolExecutor(max_workers=workers) as Executor:
- fs = [Executor.submit(self.check, proxy) for proxy in proxy_lst]
- for f in as_completed(fs):
- proxy, is_ok = f.result()
- if is_ok:
- self.add_proxy(proxy)
- else:
- self.remove_proxy(proxy)
- def run(self):
- logger.info(f'[{self.label}]开始生产代理Ip')
- while True:
- try:
- proxy_lst = self.request_proxy()
- if not proxy_lst:
- tools.delay(2)
- continue
- dynamic_workers = min((int(len(proxy_lst) / 2) or 1), 10)
- self.manage_proxy(proxy_lst, workers=dynamic_workers) # 线程池上限10
- tools.delay(self.load_interval)
- except Exception as e:
- logger.exception(e)
- class ProxyPoolClient(BaseProxyPool):
- def __init__(self, name: str, redis_label: str, scheme: str):
- """
- 调用代理池
- """
- super(ProxyPoolClient, self).__init__(name, redis_label, scheme)
- current_process = multiprocessing.current_process()
- sub_name = f'{tools.get_localhost_ip()}:{current_process.pid}'
- self.lock_label = f'{redis_label}:{sub_name}'
- def proxy_total(self):
- return len(self.get_redis_name_lst())
- def get_all_proxy(self, pk=None):
- proxy_lst = deque([])
- for redis_name in self.get_redis_name_lst():
- proxy = self.get_proxy(redis_name)
- if isinstance(proxy, dict):
- proxy_lst.append(proxy)
- if len(proxy_lst) > 0:
- if pk is not None:
- '''先按照代理类型(pk)过滤,再按照使用次数进行升序排列'''
- special = deque(filter(lambda x: x['pk'] == int(pk), proxy_lst))
- proxy_lst = deque(sorted(special, key=itemgetter('usage')))
- else:
- '''按照使用次数进行升序排列(左小右大)'''
- proxy_lst = deque(sorted(proxy_lst, key=itemgetter('usage')))
- return proxy_lst
- def get_proxy_pool(self, **kwargs):
- proxy_lst = []
- for proxy in self.get_all_proxy(**kwargs):
- last_time = proxy['last_time']
- end_time = proxy['end_time']
- expire = end_time - tools.now_ts()
- proxy_lst.append({
- 'proxies': proxy['proxies'],
- 'start_time': tools.ts2dt(proxy['start_time']),
- 'end_time': tools.ts2dt(end_time),
- 'last_time': tools.ts2dt(last_time) if last_time != 0 else '',
- 'expire': expire,
- 'usage': proxy['usage'],
- 'pk': proxy.get('pk', 1),
- })
- # 展示时按照过期时间从大到小排列
- return list(sorted(proxy_lst, key=lambda x: x['expire'], reverse=True))
- def get_all_proxy_ip(self, protocol, **kwargs):
- return [
- proxy['proxies']['http'].replace(f'{protocol}://', '')
- for proxy in self.get_all_proxy(**kwargs)
- ]
- def proxies(self, **kwargs):
- with OptimisticLock(self.redis_db, self.lock_label):
- proxy = {}
- if self.proxy_total() > 0:
- proxy_lst = self.get_all_proxy(**kwargs)
- proxy = proxy_lst.popleft()
- name = self.get_redis_name(proxy)
- mapping = {
- 'usage': proxy['usage'] + 1,
- 'last_time': tools.now_ts()
- }
- self.redis_db.hset(name, None, None, mapping)
- return proxy.get('proxies')
|