proxy.py 11 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-05-11
  4. ---------
  5. @summary: 代理池
  6. ---------
  7. @author: Dzr
  8. """
  9. import random
  10. import string
  11. import threading
  12. from concurrent.futures import ThreadPoolExecutor
  13. from threading import Thread
  14. from urllib.parse import urlparse
  15. import requests
  16. import setting as settings
  17. from base_server import BaseServer, tools
  18. from common.log import logger
  19. DEFAULT_UA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36'
  20. def decrypt(input_str: str) -> str:
  21. """
  22. 定义base64解密函数
  23. :param input_str:
  24. :return:
  25. """
  26. # 对前面不是“=”的字节取索引,然后转换为2进制
  27. key = settings.jy_proxy['socks5']['decrypt']
  28. ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '=']
  29. output_str = ''
  30. # 补齐“=”的个数
  31. equal_num = input_str.count('=')
  32. while ascii_list:
  33. temp_list = ascii_list[:4]
  34. # 转换成2进制字符串
  35. temp_str = ''.join(temp_list)
  36. # 对没有8位2进制的字符串补够8位2进制
  37. if len(temp_str) % 8 != 0:
  38. temp_str = temp_str[0:-1 * equal_num * 2]
  39. # 4个6字节的二进制 转换 为三个8字节的二进制
  40. temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]]
  41. # 二进制转为10进制
  42. temp_str_list = [int(x, 2) for x in temp_str_list if x]
  43. # 连接成字符串
  44. output_str += ''.join([chr(x) for x in temp_str_list])
  45. ascii_list = ascii_list[4:]
  46. return output_str
  47. def get_base_url():
  48. return settings.jy_proxy['socks5']['base_url']
  49. def get_netloc(proxy, default=None):
  50. proxies = None
  51. if isinstance(proxy, dict):
  52. proxies = proxy.get('proxies')
  53. if isinstance(proxies, str):
  54. proxies = tools.json_loads(proxies)
  55. # proxies = proxy.get('proxies') if isinstance(proxy, dict) else None
  56. if proxies is not None:
  57. parser = urlparse(proxies.get('http'))
  58. default = parser.netloc
  59. return default
  60. def get_random(length=4):
  61. return ''.join(random.sample(string.ascii_letters + string.digits, length))
  62. def get_host(proxies, scheme):
  63. return str(proxies['http']).replace(scheme + '://', '', 1)
  64. class BaseProxyPool(BaseServer):
  65. def __init__(self, server, redis_label, scheme):
  66. super(BaseProxyPool, self).__init__(server=server, label=redis_label)
  67. self.scheme = scheme.lower()
  68. class ProxyPoolServer(BaseProxyPool, threading.Thread):
  69. def __init__(self, service_name, redis_label, scheme):
  70. """
  71. 代理池生产管理
  72. @param str service_name: 服务名称
  73. @param str redis_label: redis 标识前缀
  74. @param str scheme: 协议类型
  75. """
  76. threading.Thread.__init__(self)
  77. super(ProxyPoolServer, self).__init__(service_name, redis_label, scheme)
  78. self.ports = ['8862', '8863'] if self.scheme == 'http' else ['8860', '8861']
  79. self.load_interval = 60 # 访问vps服务的间隔时长
  80. self.tab_name = f'proxy:{self.scheme}:items'
  81. self.lst_name = f'proxy:{self.scheme}:pk_0'
  82. self.lst_name_1 = f'proxy:{self.scheme}:pk_1'
  83. self.lst_name_2 = f'proxy:{self.scheme}:pk_2'
  84. def request_proxy(self):
  85. proxy_lst = []
  86. for idx, url in enumerate(settings.jy_proxy['socks5']['url']):
  87. try:
  88. response = requests.get(url, timeout=10)
  89. except requests.RequestException as e:
  90. logger.error(f'vps服务|请求失败|{url}|原因:{e.args}')
  91. else:
  92. for item in response.json():
  93. ports = list(filter(lambda p: p in self.ports, item['ports']))
  94. if not ports:
  95. continue
  96. ip = decrypt(item['ip'])
  97. for port in ports:
  98. args = (self.scheme, ip, int(port))
  99. proxy = {
  100. 'proxies': {
  101. 'http': '{}://{}:{}'.format(*args),
  102. 'https': '{}://{}:{}'.format(*args)
  103. },
  104. 'expire': item['lifetime'],
  105. 'pk': idx + 1
  106. }
  107. proxy_lst.append(proxy)
  108. return proxy_lst
  109. def add_proxy(self, proxy):
  110. """
  111. 添加代理IP到Redis
  112. """
  113. host = get_host(proxy['proxies'], self.scheme)
  114. if not self.redis_db.hexists(self.tab_name, host):
  115. self.redis_db.rpush(self.lst_name, host)
  116. self.redis_db.hset(self.tab_name, host, tools.json_dumps(proxy))
  117. pk = proxy['pk']
  118. if pk == 1:
  119. self.redis_db.rpush(self.lst_name_1, host)
  120. elif pk == 2:
  121. self.redis_db.rpush(self.lst_name_2, host)
  122. logger.debug(f"添加代理|{host}")
  123. def del_proxy(self, proxy):
  124. host = get_host(proxy['proxies'], self.scheme)
  125. n1 = 0
  126. pk = proxy['pk']
  127. if pk == 1:
  128. n1 = self.redis_db.lrem(self.lst_name_1, 0, host)
  129. elif pk == 2:
  130. n1 = self.redis_db.lrem(self.lst_name_2, 0, host)
  131. n0 = self.redis_db.lrem(self.lst_name, 0, host) # 移除所有匹配的元素
  132. self.redis_db.hdel(self.tab_name, host)
  133. logger.debug(f"移除代理|{host}|{n1 + n0}")
  134. def update_proxy(self, proxy, success):
  135. """
  136. 更新代理IP的信息
  137. """
  138. if not success:
  139. self.del_proxy(proxy)
  140. else:
  141. self.add_proxy(proxy)
  142. def validate_proxy(self, proxy):
  143. # url = 'https://myip.ipip.net'
  144. url = 'https://www.baidu.com/'
  145. proxies = proxy['proxies']
  146. host = str(proxies['http']).replace(self.scheme + '://', '', 1)
  147. logger.debug(f"代理检查|{host}")
  148. try:
  149. request_params = {
  150. "headers": {"User-Agent": DEFAULT_UA},
  151. "proxies": proxies,
  152. "timeout": 5
  153. }
  154. r = requests.get(url, **request_params)
  155. return proxy, r.status_code == requests.codes.ok
  156. except requests.RequestException:
  157. return proxy, False
  158. def manage_proxy(self, proxy_lst, workers):
  159. """
  160. :param list proxy_lst: 代理ip列表
  161. :param int workers: 工作线程数
  162. """
  163. with ThreadPoolExecutor(max_workers=workers, thread_name_prefix='validate') as executor:
  164. fs = executor.map(self.validate_proxy, proxy_lst)
  165. for args in fs:
  166. self.update_proxy(*args)
  167. def _create_pool(self):
  168. logger.info('创建Ip池')
  169. while True:
  170. try:
  171. proxy_lst = self.request_proxy()
  172. if not proxy_lst:
  173. tools.delay(0.5)
  174. continue
  175. workers = min((int(len(proxy_lst) / 2) or 1), 4) # 最大工作线程数
  176. self.manage_proxy(proxy_lst, workers=workers)
  177. tools.delay(self.load_interval)
  178. except Exception as e:
  179. logger.error('Ip池创建失败')
  180. logger.exception(e)
  181. def _watch_pool(self):
  182. logger.info('Ip池代理监控')
  183. while True:
  184. try:
  185. proxy_dict = self.redis_db.hgetall(self.tab_name)
  186. if not proxy_dict:
  187. tools.delay(0.5)
  188. continue
  189. proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()]
  190. invalid_proxy_lst = list(filter(lambda proxy: (int(proxy['expire']) - tools.now_ts()) < 30, proxy_lst))
  191. for proxy in invalid_proxy_lst:
  192. self.del_proxy(proxy)
  193. tools.delay(2)
  194. except Exception as e:
  195. logger.exception(e)
  196. def run(self):
  197. _join_lst = []
  198. _thread_lst = [
  199. Thread(target=self._create_pool, daemon=True, name='CreatePool'),
  200. Thread(target=self._watch_pool, daemon=True, name='WatchPool')
  201. ]
  202. for thread in _thread_lst:
  203. thread.start()
  204. _join_lst.append(thread)
  205. for thread in _join_lst:
  206. thread.join()
  207. class ProxyPoolClient(BaseProxyPool):
  208. def __init__(self, service_name, redis_label, scheme):
  209. """
  210. @param str service_name: 服务名称
  211. @param str redis_label: 服务名称
  212. @param str scheme: 协议名称
  213. """
  214. super().__init__(service_name, redis_label, scheme)
  215. self.tab_name = f'proxy:{self.scheme}:items'
  216. self.lst_name = f'proxy:{self.scheme}:pk_0'
  217. self.lst_name_1 = f'proxy:{self.scheme}:pk_1'
  218. self.lst_name_2 = f'proxy:{self.scheme}:pk_2'
  219. def _get_proxy(self, src, dst, timeout=3.0):
  220. return self.redis_db.brpoplpush(src, dst, timeout=timeout)
  221. def get_proxy(self):
  222. """
  223. 从Redis中获取一个代理IP
  224. """
  225. return self._get_proxy(self.lst_name, self.lst_name, timeout=0.5)
  226. def get_pk1_proxy(self):
  227. return self._get_proxy(self.lst_name_1, self.lst_name_1, timeout=0.5)
  228. def get_pk2_proxy(self):
  229. return self._get_proxy(self.lst_name_2, self.lst_name_2, timeout=0.5)
  230. def proxies(self, pk=0):
  231. if pk == 1:
  232. host = self.get_pk1_proxy()
  233. elif pk == 2:
  234. host = self.get_pk2_proxy()
  235. else:
  236. host = self.get_proxy()
  237. if not host:
  238. return
  239. args = (self.scheme, host)
  240. r = {'http': '{}://{}'.format(*args), 'https': '{}://{}'.format(*args)}
  241. return r
  242. def _get_all_proxy(self, pk=None):
  243. results = []
  244. proxy_dict = self.redis_db.hgetall(self.tab_name)
  245. if not proxy_dict:
  246. return results
  247. proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()]
  248. if pk is not None:
  249. proxy_lst = list(filter(lambda p: p['pk'] == pk, proxy_lst))
  250. return proxy_lst
  251. def get_proxy_pool(self, **kwargs):
  252. results = []
  253. proxy_lst = self._get_all_proxy(**kwargs)
  254. for proxy in proxy_lst:
  255. life_time = proxy['expire']
  256. expire = life_time - tools.now_ts()
  257. results.append({
  258. 'proxies': proxy['proxies'],
  259. 'expire_time': tools.ts2dt(life_time),
  260. 'expire': expire,
  261. 'pk': proxy.get('pk', 1),
  262. })
  263. # 展示时按照过期时间从大到小排列
  264. return list(sorted(results, key=lambda x: x['expire'], reverse=True))
  265. def get_all_proxy_ip(self, protocol, **kwargs):
  266. return [
  267. proxy['proxies']['http'].replace(f'{protocol}://', '', 1)
  268. for proxy in self._get_all_proxy(**kwargs)
  269. ]