proxy.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-05-11
  4. ---------
  5. @summary: 代理池
  6. ---------
  7. @author: Dzr
  8. """
  9. import ast
  10. import multiprocessing
  11. import random
  12. import threading
  13. from collections import deque
  14. from concurrent.futures import ThreadPoolExecutor, as_completed
  15. from operator import itemgetter
  16. from urllib.parse import urlparse
  17. import requests
  18. import setting as settings
  19. from base_server import BaseServer, tools
  20. from common.log import logger
  21. from common.redis_lock import acquire_lock_with_timeout, release_lock
  22. DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36'
  23. def decrypt(input_str: str) -> str:
  24. """
  25. 定义base64解密函数
  26. :param input_str:
  27. :return:
  28. """
  29. # 对前面不是“=”的字节取索引,然后转换为2进制
  30. key = settings.jy_proxy['socks5']['decrypt']
  31. ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '=']
  32. output_str = ''
  33. # 补齐“=”的个数
  34. equal_num = input_str.count('=')
  35. while ascii_list:
  36. temp_list = ascii_list[:4]
  37. # 转换成2进制字符串
  38. temp_str = ''.join(temp_list)
  39. # 对没有8位2进制的字符串补够8位2进制
  40. if len(temp_str) % 8 != 0:
  41. temp_str = temp_str[0:-1 * equal_num * 2]
  42. # 4个6字节的二进制 转换 为三个8字节的二进制
  43. temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]]
  44. # 二进制转为10进制
  45. temp_str_list = [int(x, 2) for x in temp_str_list if x]
  46. # 连接成字符串
  47. output_str += ''.join([chr(x) for x in temp_str_list])
  48. ascii_list = ascii_list[4:]
  49. return output_str
  50. def get_base_url():
  51. return settings.jy_proxy['socks5']['base_url']
  52. def get_netloc(proxy, default=None):
  53. proxies = None
  54. if isinstance(proxy, dict):
  55. proxies = proxy.get('proxies')
  56. if isinstance(proxies, str):
  57. proxies = tools.json_loads(proxies)
  58. # proxies = proxy.get('proxies') if isinstance(proxy, dict) else None
  59. if proxies is not None:
  60. parser = urlparse(proxies.get('http'))
  61. default = parser.netloc
  62. return default
  63. class BaseProxyPool(BaseServer):
  64. def __init__(self, name, redis_label, scheme):
  65. super(BaseProxyPool, self).__init__(server=name, label=redis_label)
  66. self.scheme = scheme.lower()
  67. self.proxy_name = self.scheme + self.server
  68. self.proxy_queue = f'{redis_label}_{self.scheme}'
  69. self.unique_key = ('ip', 'port') # 组合 proxy 指纹的字段名称
  70. def get_redis_name(self, proxy):
  71. return f"{self.proxy_queue}_{proxy['fingerprint']}"
  72. def get_redis_name_lst(self, pattern='*'):
  73. return self.redis_db.keys(self.proxy_queue + pattern)
  74. def get_proxy(self, name):
  75. items = self.redis_db.hgetall(name)
  76. if items is None or 'proxies' not in items:
  77. return None
  78. proxy = {
  79. 'proxies': ast.literal_eval(items['proxies']),
  80. 'fingerprint': items['fingerprint'],
  81. 'start_time': int(items['start_time']),
  82. 'end_time': int(items['end_time']),
  83. 'last_time': int(items['last_time']),
  84. 'usage': int(items['usage']),
  85. 'pk': int(items.get('pk', 1))
  86. }
  87. return proxy
  88. def get(self, name, key):
  89. return self.redis_db.hget(name, key)
  90. def exists(self, proxy):
  91. return self.redis_db.exists(self.get_redis_name(proxy))
  92. def check(self, proxy):
  93. is_ok = False
  94. # url = 'https://myip.ipip.net'
  95. url = 'https://www.baidu.com/'
  96. netloc = get_netloc(proxy)
  97. try:
  98. requests_param = {
  99. "headers": {'User-Agent': DEFAULT_UA},
  100. "proxies": proxy['proxies'],
  101. "timeout": 5
  102. }
  103. requests.get(url, **requests_param)
  104. is_ok = True
  105. except requests.RequestException:
  106. pass
  107. msg = "正常" if is_ok else "失效"
  108. logger.debug(f"[{self.proxy_name}]检查代理Ip - {netloc} --通信{msg}")
  109. return proxy, is_ok
  110. def remove_proxy(self, proxy):
  111. netloc = get_netloc(proxy)
  112. logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --删除")
  113. if self.exists(proxy):
  114. redis_name = self.get_redis_name(proxy)
  115. self.redis_db.delete(redis_name)
  116. def add_proxy(self, proxy):
  117. netloc = get_netloc(proxy)
  118. logger.debug(f"[{self.proxy_name}]代理Ip - {netloc} --添加")
  119. if not self.exists(proxy):
  120. redis_name = self.get_redis_name(proxy)
  121. self.redis_db.hset(redis_name, None, None, mapping=proxy)
  122. expire_ts = proxy['end_time'] - tools.now_ts()
  123. self.redis_db.expire(redis_name, expire_ts)
  124. class ProxyPoolServer(BaseProxyPool, threading.Thread):
  125. def __init__(self, name, redis_label, scheme):
  126. """
  127. 代理池生产管理
  128. @param str name: 服务名称
  129. @param str redis_label: redis 标识前缀
  130. @param str scheme: 协议类型
  131. """
  132. threading.Thread.__init__(self)
  133. super(ProxyPoolServer, self).__init__(name, redis_label, scheme)
  134. self.label = f'{self.proxy_name}_{self.getName()}'
  135. self.ports = ['8862', '8863'] if self.scheme == 'http' else ['8860', '8861']
  136. self.load_interval = 60 # 轮询访问vps代理服务的时间间隔
  137. def remove_failure_proxy(self, proxy_lst):
  138. """删除失效/故障代理ip"""
  139. logger.info(f"[{self.label}]清除无效代理Ip")
  140. proxy_fingerprints = set([proxy['fingerprint'] for proxy in proxy_lst])
  141. for redis_name in self.get_redis_name_lst():
  142. fingerprint = self.get(redis_name, 'fingerprint')
  143. if fingerprint not in proxy_fingerprints:
  144. self.redis_db.delete(redis_name)
  145. def request_proxy(self):
  146. logger.info(f"[{self.label}]请求vps服务")
  147. proxy_lst = []
  148. for idx, url in enumerate(settings.jy_proxy['socks5']['url']):
  149. try:
  150. response = requests.get(url, timeout=10)
  151. for item in response.json():
  152. ports = list(filter(lambda p: p in self.ports, item['ports']))
  153. if not ports:
  154. continue
  155. ip = decrypt(item['ip'])
  156. port = int(ports[random.randint(0, len(ports) - 1)])
  157. start_time = tools.now_ts()
  158. end_time = item['lifetime']
  159. if end_time - start_time > 0:
  160. proxy = {
  161. 'proxies': {
  162. 'http': '{}://{}:{}'.format(self.scheme, ip, port),
  163. 'https': '{}://{}:{}'.format(self.scheme, ip, port)
  164. },
  165. 'fingerprint': self.fingerprint(ip=ip, port=port),
  166. 'start_time': start_time,
  167. 'end_time': end_time,
  168. 'last_time': 0,
  169. 'usage': 0,
  170. 'pk': idx + 1
  171. }
  172. proxy_lst.append(proxy)
  173. except Exception as e:
  174. logger.error(f'[{self.label}]vps服务访问异常[{url}],原因:{e.args}')
  175. return proxy_lst
  176. def manage_proxy(self, proxy_lst: list, workers=1):
  177. self.remove_failure_proxy(proxy_lst)
  178. with ThreadPoolExecutor(max_workers=workers) as Executor:
  179. fs = [Executor.submit(self.check, proxy) for proxy in proxy_lst]
  180. for f in as_completed(fs):
  181. proxy, is_ok = f.result()
  182. if is_ok:
  183. self.add_proxy(proxy)
  184. else:
  185. self.remove_proxy(proxy)
  186. def run(self):
  187. logger.info(f'[{self.label}]开始生产代理Ip')
  188. while True:
  189. try:
  190. proxy_lst = self.request_proxy()
  191. if not proxy_lst:
  192. tools.delay(2)
  193. continue
  194. dynamic_workers = min((int(len(proxy_lst) / 2) or 1), 10)
  195. self.manage_proxy(proxy_lst, workers=dynamic_workers) # 线程池上限10
  196. tools.delay(self.load_interval)
  197. except Exception as e:
  198. logger.exception(e)
  199. class ProxyPoolClient(BaseProxyPool):
  200. def __init__(self, name: str, redis_label: str, scheme: str):
  201. """
  202. 调用代理池
  203. """
  204. super(ProxyPoolClient, self).__init__(name, redis_label, scheme)
  205. current_process = multiprocessing.current_process()
  206. sub_name = f'{tools.get_localhost_ip()}:{current_process.pid}'
  207. self.lock_label = f'{redis_label}:{sub_name}'
  208. @property
  209. def proxy_total(self):
  210. return len(self.get_redis_name_lst())
  211. def get_all_proxy(self, pk=None):
  212. proxy_lst = deque([])
  213. for redis_name in self.get_redis_name_lst():
  214. proxy = self.get_proxy(redis_name)
  215. if isinstance(proxy, dict):
  216. proxy_lst.append(proxy)
  217. if len(proxy_lst) > 0:
  218. if pk is not None:
  219. '''先按照代理类型(pk)过滤,再按照使用次数进行升序排列'''
  220. special = deque(filter(lambda x: x['pk'] == int(pk), proxy_lst))
  221. proxy_lst = deque(sorted(special, key=itemgetter('usage')))
  222. else:
  223. '''按照使用次数进行升序排列(左小右大)'''
  224. proxy_lst = deque(sorted(proxy_lst, key=itemgetter('usage')))
  225. return proxy_lst
  226. def get_proxy_pool(self, **kwargs):
  227. proxy_lst = []
  228. for proxy in self.get_all_proxy(**kwargs):
  229. last_time = proxy['last_time']
  230. end_time = proxy['end_time']
  231. expire = end_time - tools.now_ts()
  232. proxy_lst.append({
  233. 'proxies': proxy['proxies'],
  234. 'start_time': tools.ts2dt(proxy['start_time']),
  235. 'end_time': tools.ts2dt(end_time),
  236. 'last_time': tools.ts2dt(last_time) if last_time != 0 else '',
  237. 'expire': expire,
  238. 'usage': proxy['usage'],
  239. 'pk': proxy.get('pk', 1),
  240. })
  241. # 展示时按照过期时间从大到小排列
  242. return list(sorted(proxy_lst, key=lambda x: x['expire'], reverse=True))
  243. def get_all_proxy_ip(self, protocol, **kwargs):
  244. return [
  245. proxy['proxies']['http'].replace(f'{protocol}://', '')
  246. for proxy in self.get_all_proxy(**kwargs)
  247. ]
  248. def proxies(self, **kwargs):
  249. lock = acquire_lock_with_timeout(self.redis_db, self.lock_label)
  250. if lock:
  251. proxy = {}
  252. if self.proxy_total > 0:
  253. proxy_lst = self.get_all_proxy(**kwargs)
  254. proxy = proxy_lst.popleft()
  255. name = self.get_redis_name(proxy)
  256. self.redis_db.hset(name, 'usage', proxy['usage'] + 1)
  257. self.redis_db.hset(name, 'last_time', tools.now_ts())
  258. release_lock(self.redis_db, self.lock_label, lock)
  259. return proxy.get('proxies')