123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-05-11
- ---------
- @summary: 代理池
- ---------
- @author: Dzr
- """
- import random
- import string
- import threading
- from concurrent.futures import ThreadPoolExecutor
- from threading import Thread
- from urllib.parse import urlparse
- import requests
- import setting as settings
- from base_server import BaseServer, tools
- from common.log import logger
- DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36'
- def decrypt(input_str: str) -> str:
- """
- 定义base64解密函数
- :param input_str:
- :return:
- """
- # 对前面不是“=”的字节取索引,然后转换为2进制
- key = settings.jy_proxy['socks5']['decrypt']
- ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '=']
- output_str = ''
- # 补齐“=”的个数
- equal_num = input_str.count('=')
- while ascii_list:
- temp_list = ascii_list[:4]
- # 转换成2进制字符串
- temp_str = ''.join(temp_list)
- # 对没有8位2进制的字符串补够8位2进制
- if len(temp_str) % 8 != 0:
- temp_str = temp_str[0:-1 * equal_num * 2]
- # 4个6字节的二进制 转换 为三个8字节的二进制
- temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]]
- # 二进制转为10进制
- temp_str_list = [int(x, 2) for x in temp_str_list if x]
- # 连接成字符串
- output_str += ''.join([chr(x) for x in temp_str_list])
- ascii_list = ascii_list[4:]
- return output_str
- def get_base_url():
- return settings.jy_proxy['socks5']['base_url']
- def get_netloc(proxy, default=None):
- proxies = None
- if isinstance(proxy, dict):
- proxies = proxy.get('proxies')
- if isinstance(proxies, str):
- proxies = tools.json_loads(proxies)
- # proxies = proxy.get('proxies') if isinstance(proxy, dict) else None
- if proxies is not None:
- parser = urlparse(proxies.get('http'))
- default = parser.netloc
- return default
- def get_random(length=4):
- return ''.join(random.sample(string.ascii_letters + string.digits, length))
- def get_host(proxies, scheme):
- return str(proxies['http']).replace(scheme + '://', '', 1)
- class BaseProxyPool(BaseServer):
- def __init__(self, server, redis_label, scheme):
- super(BaseProxyPool, self).__init__(server=server, label=redis_label)
- self.scheme = scheme.lower()
- class ProxyPoolServer(BaseProxyPool, threading.Thread):
- def __init__(self, service_name, redis_label, scheme):
- """
- 代理池生产管理
- @param str service_name: 服务名称
- @param str redis_label: redis 标识前缀
- @param str scheme: 协议类型
- """
- threading.Thread.__init__(self)
- super(ProxyPoolServer, self).__init__(service_name, redis_label, scheme)
- self.ports = ['8862', '8863'] if self.scheme == 'http' else ['8860', '8861']
- self.load_interval = 60 # 访问vps服务的间隔时长
- self.tab_name = f'proxy:{self.scheme}:items'
- self.lst_name = f'proxy:{self.scheme}:pk_0'
- self.lst_name_1 = f'proxy:{self.scheme}:pk_1'
- self.lst_name_2 = f'proxy:{self.scheme}:pk_2'
- def request_proxy(self):
- proxy_lst = []
- for idx, url in enumerate(settings.jy_proxy['socks5']['url']):
- try:
- response = requests.get(url, timeout=10)
- except requests.RequestException as e:
- logger.error(f'vps服务|请求失败|{url}|原因:{e.args}')
- else:
- for item in response.json():
- ports = list(filter(lambda p: p in self.ports, item['ports']))
- if not ports:
- continue
- ip = decrypt(item['ip'])
- for port in ports:
- args = (self.scheme, ip, int(port))
- proxy = {
- 'proxies': {
- 'http': '{}://{}:{}'.format(*args),
- 'https': '{}://{}:{}'.format(*args)
- },
- 'expire': item['lifetime'],
- 'pk': idx + 1
- }
- proxy_lst.append(proxy)
- return proxy_lst
- def add_proxy(self, proxy):
- """
- 添加代理IP到Redis
- """
- host = get_host(proxy['proxies'], self.scheme)
- if not self.redis_db.hexists(self.tab_name, host):
- self.redis_db.rpush(self.lst_name, host)
- self.redis_db.hset(self.tab_name, host, tools.json_dumps(proxy))
- pk = proxy['pk']
- if pk == 1:
- self.redis_db.rpush(self.lst_name_1, host)
- elif pk == 2:
- self.redis_db.rpush(self.lst_name_2, host)
- logger.debug(f"添加代理|{host}")
- def del_proxy(self, proxy):
- host = get_host(proxy['proxies'], self.scheme)
- n1 = 0
- pk = proxy['pk']
- if pk == 1:
- n1 = self.redis_db.lrem(self.lst_name_1, 0, host)
- elif pk == 2:
- n1 = self.redis_db.lrem(self.lst_name_2, 0, host)
- n0 = self.redis_db.lrem(self.lst_name, 0, host) # 移除所有匹配的元素
- self.redis_db.hdel(self.tab_name, host)
- logger.debug(f"移除代理|{host}|{n1 + n0}")
- def update_proxy(self, proxy, success):
- """
- 更新代理IP的信息
- """
- if not success:
- self.del_proxy(proxy)
- else:
- self.add_proxy(proxy)
- def validate_proxy(self, proxy):
- # url = 'https://myip.ipip.net'
- url = 'https://www.baidu.com/'
- proxies = proxy['proxies']
- host = str(proxies['http']).replace(self.scheme + '://', '', 1)
- logger.debug(f"代理检查|{host}")
- try:
- request_params = {
- "headers": {"User-Agent": DEFAULT_UA},
- "proxies": proxies,
- "timeout": 5
- }
- r = requests.get(url, **request_params)
- return proxy, r.status_code == requests.codes.ok
- except requests.RequestException:
- return proxy, False
- def manage_proxy(self, proxy_lst, workers):
- """
- :param list proxy_lst: 代理ip列表
- :param int workers: 工作线程数
- """
- with ThreadPoolExecutor(max_workers=workers, thread_name_prefix='validate') as executor:
- fs = executor.map(self.validate_proxy, proxy_lst)
- for args in fs:
- self.update_proxy(*args)
- def _create_pool(self):
- logger.info('创建Ip池')
- while True:
- try:
- proxy_lst = self.request_proxy()
- if not proxy_lst:
- tools.delay(0.5)
- continue
- workers = min((int(len(proxy_lst) / 2) or 1), 4) # 最大工作线程数
- self.manage_proxy(proxy_lst, workers=workers)
- tools.delay(self.load_interval)
- except Exception as e:
- logger.error('Ip池创建失败')
- logger.exception(e)
- def _watch_pool(self):
- logger.info('Ip池代理监控')
- while True:
- try:
- proxy_dict = self.redis_db.hgetall(self.tab_name)
- if not proxy_dict:
- tools.delay(0.5)
- continue
- proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()]
- invalid_proxy_lst = list(filter(lambda proxy: (int(proxy['expire']) - tools.now_ts()) < 30, proxy_lst))
- for proxy in invalid_proxy_lst:
- self.del_proxy(proxy)
- tools.delay(2)
- except Exception as e:
- logger.exception(e)
- def run(self):
- _join_lst = []
- _thread_lst = [
- Thread(target=self._create_pool, daemon=True, name='CreatePool'),
- Thread(target=self._watch_pool, daemon=True, name='WatchPool')
- ]
- for thread in _thread_lst:
- thread.start()
- _join_lst.append(thread)
- for thread in _join_lst:
- thread.join()
- class ProxyPoolClient(BaseProxyPool):
- def __init__(self, service_name, redis_label, scheme):
- """
- @param str service_name: 服务名称
- @param str redis_label: 服务名称
- @param str scheme: 协议名称
- """
- super().__init__(service_name, redis_label, scheme)
- self.tab_name = f'proxy:{self.scheme}:items'
- self.lst_name = f'proxy:{self.scheme}:pk_0'
- self.lst_name_1 = f'proxy:{self.scheme}:pk_1'
- self.lst_name_2 = f'proxy:{self.scheme}:pk_2'
- def _get_proxy(self, src, dst, timeout=3.0):
- return self.redis_db.brpoplpush(src, dst, timeout=timeout)
- def get_proxy(self):
- """
- 从Redis中获取一个代理IP
- """
- return self._get_proxy(self.lst_name, self.lst_name, timeout=0.5)
- def get_pk1_proxy(self):
- return self._get_proxy(self.lst_name_1, self.lst_name_1, timeout=0.5)
- def get_pk2_proxy(self):
- return self._get_proxy(self.lst_name_2, self.lst_name_2, timeout=0.5)
- def proxies(self, pk=0):
- if pk == 1:
- host = self.get_pk1_proxy()
- elif pk == 2:
- host = self.get_pk2_proxy()
- else:
- host = self.get_proxy()
- if not host:
- return
- args = (self.scheme, host)
- r = {'http': '{}://{}'.format(*args), 'https': '{}://{}'.format(*args)}
- return r
- def _get_all_proxy(self, pk=None):
- results = []
- proxy_dict = self.redis_db.hgetall(self.tab_name)
- if not proxy_dict:
- return results
- proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()]
- if pk is not None:
- proxy_lst = list(filter(lambda p: p['pk'] == pk, proxy_lst))
- return proxy_lst
- def get_proxy_pool(self, **kwargs):
- results = []
- proxy_lst = self._get_all_proxy(**kwargs)
- for proxy in proxy_lst:
- life_time = proxy['expire']
- expire = life_time - tools.now_ts()
- results.append({
- 'proxies': proxy['proxies'],
- 'expire_time': tools.ts2dt(life_time),
- 'expire': expire,
- 'pk': proxy.get('pk', 1),
- })
- # 展示时按照过期时间从大到小排列
- return list(sorted(results, key=lambda x: x['expire'], reverse=True))
- def get_all_proxy_ip(self, protocol, **kwargs):
- return [
- proxy['proxies']['http'].replace(f'{protocol}://', '', 1)
- for proxy in self._get_all_proxy(**kwargs)
- ]
|