proxy.py 10 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-05-11
  4. ---------
  5. @summary: 代理池
  6. ---------
  7. @author: Dzr
  8. """
  9. import random
  10. import string
  11. import threading
  12. from concurrent.futures import ThreadPoolExecutor
  13. from threading import Thread
  14. from urllib.parse import urlparse
  15. import requests
  16. import setting as settings
  17. from base_server import BaseServer, tools
  18. from common.log import logger
  19. from common.net import dial_timeout
  20. def decrypt(input_str: str) -> str:
  21. """
  22. 定义base64解密函数
  23. :param input_str:
  24. :return:
  25. """
  26. # 对前面不是“=”的字节取索引,然后转换为2进制
  27. key = settings.jy_proxy['socks5']['decrypt']
  28. ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '=']
  29. output_str = ''
  30. # 补齐“=”的个数
  31. equal_num = input_str.count('=')
  32. while ascii_list:
  33. temp_list = ascii_list[:4]
  34. # 转换成2进制字符串
  35. temp_str = ''.join(temp_list)
  36. # 对没有8位2进制的字符串补够8位2进制
  37. if len(temp_str) % 8 != 0:
  38. temp_str = temp_str[0:-1 * equal_num * 2]
  39. # 4个6字节的二进制 转换 为三个8字节的二进制
  40. temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]]
  41. # 二进制转为10进制
  42. temp_str_list = [int(x, 2) for x in temp_str_list if x]
  43. # 连接成字符串
  44. output_str += ''.join([chr(x) for x in temp_str_list])
  45. ascii_list = ascii_list[4:]
  46. return output_str
  47. def get_base_url():
  48. return settings.jy_proxy['socks5']['base_url']
  49. def get_random(length=4):
  50. return ''.join(random.sample(string.ascii_letters + string.digits, length))
  51. def get_netloc(proxy, default=None):
  52. proxies = None
  53. if isinstance(proxy, dict):
  54. proxies = proxy.get('proxies')
  55. if isinstance(proxies, str):
  56. proxies = tools.json_loads(proxies)
  57. # proxies = proxy.get('proxies') if isinstance(proxy, dict) else None
  58. if proxies is not None:
  59. parser = urlparse(proxies.get('http'))
  60. default = parser.netloc
  61. return default
  62. def get_host(proxies, scheme):
  63. return str(proxies['http']).replace(scheme + '://', '', 1)
  64. def get_addr_and_port(host):
  65. """
  66. @param str host:
  67. @return:
  68. """
  69. addr, port = host.split(':', 1)
  70. return addr, int(port)
  71. class BaseProxyPool(BaseServer):
  72. def __init__(self, server, redis_label, scheme):
  73. super(BaseProxyPool, self).__init__(server=server, label=redis_label)
  74. self.scheme = scheme.lower()
  75. class ProxyPoolServer(BaseProxyPool, threading.Thread):
  76. def __init__(self, service_name, redis_label, scheme):
  77. """
  78. 代理池生产管理
  79. @param str service_name: 服务名称
  80. @param str redis_label: redis 标识前缀
  81. @param str scheme: 协议类型
  82. """
  83. threading.Thread.__init__(self)
  84. super(ProxyPoolServer, self).__init__(service_name, redis_label, scheme)
  85. self.ports = ['8862', '8863'] if self.scheme == 'http' else ['8860', '8861']
  86. self.load_interval = 60 # 访问vps服务的间隔时长
  87. self.tab_name = f'proxy:{self.scheme}:items'
  88. self.lst_name = f'proxy:{self.scheme}:pk_0'
  89. self.lst_name_1 = f'proxy:{self.scheme}:pk_1'
  90. self.lst_name_2 = f'proxy:{self.scheme}:pk_2'
  91. def fetch(self):
  92. proxy_lst = []
  93. for idx, url in enumerate(settings.jy_proxy['socks5']['url']):
  94. try:
  95. response = requests.get(url, timeout=10)
  96. except requests.RequestException as e:
  97. logger.error(f'vps服务|请求失败|{url}|原因:{e.args}')
  98. else:
  99. for item in response.json():
  100. ports = list(filter(lambda p: p in self.ports, item['ports']))
  101. if not ports:
  102. continue
  103. ip = decrypt(item['ip'])
  104. for port in ports:
  105. args = (self.scheme, ip, int(port))
  106. proxy = {
  107. 'proxies': {
  108. 'http': '{}://{}:{}'.format(*args),
  109. 'https': '{}://{}:{}'.format(*args)
  110. },
  111. 'expire': item['lifetime'],
  112. 'pk': idx + 1
  113. }
  114. proxy_lst.append(proxy)
  115. return proxy_lst
  116. def insert(self, proxy):
  117. """
  118. 添加代理IP到Redis
  119. """
  120. host = get_host(proxy['proxies'], self.scheme)
  121. if not self.redis_db.hexists(self.tab_name, host):
  122. self.redis_db.rpush(self.lst_name, host)
  123. self.redis_db.hset(self.tab_name, host, tools.json_dumps(proxy))
  124. pk = proxy['pk']
  125. if pk == 1:
  126. self.redis_db.rpush(self.lst_name_1, host)
  127. elif pk == 2:
  128. self.redis_db.rpush(self.lst_name_2, host)
  129. logger.info(f"添加代理|{host}")
  130. def drop(self, proxy):
  131. host = get_host(proxy['proxies'], self.scheme)
  132. n1 = 0
  133. pk = proxy['pk']
  134. if pk == 1:
  135. n1 = self.redis_db.lrem(self.lst_name_1, host, 0)
  136. elif pk == 2:
  137. n1 = self.redis_db.lrem(self.lst_name_2, host, 0)
  138. n0 = self.redis_db.lrem(self.lst_name, host, 0) # 移除所有匹配的元素
  139. self.redis_db.hdel(self.tab_name, host)
  140. logger.info(f"移除代理|{host}|{n1 + n0}")
  141. def update_proxy(self, proxy, success):
  142. """
  143. 更新代理IP的信息
  144. """
  145. if not success:
  146. self.drop(proxy)
  147. else:
  148. self.insert(proxy)
  149. def check(self, proxy):
  150. proxies = proxy['proxies']
  151. host = get_host(proxies, self.scheme)
  152. logger.debug(f"代理检查|{host}")
  153. addr, port = get_addr_and_port(host)
  154. ret = dial_timeout(addr, port, 10)
  155. return proxy, ret
  156. def bulk_write(self, proxy_lst, workers):
  157. """
  158. :param list proxy_lst: 代理ip列表
  159. :param int workers: 工作线程数
  160. """
  161. thread_config = dict(max_workers=workers, thread_name_prefix='check')
  162. with ThreadPoolExecutor(**thread_config) as executor:
  163. fs = executor.map(self.check, proxy_lst)
  164. for args in fs:
  165. self.update_proxy(*args)
  166. def _create_pool(self):
  167. logger.info('创建Ip池')
  168. while True:
  169. try:
  170. proxy_lst = self.fetch()
  171. if not proxy_lst:
  172. tools.delay(0.5)
  173. continue
  174. workers = min((int(len(proxy_lst) / 2) or 1), 4) # 最大工作线程数
  175. self.bulk_write(proxy_lst, workers=workers)
  176. tools.delay(self.load_interval)
  177. except Exception as e:
  178. logger.error('Ip池创建失败')
  179. logger.exception(e)
  180. def _watch(self):
  181. logger.info('Ip池代理监控')
  182. while True:
  183. try:
  184. proxy_dict = self.redis_db.hgetall(self.tab_name)
  185. if not proxy_dict:
  186. tools.delay(0.5)
  187. continue
  188. proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()]
  189. proxy_pool = list(filter(lambda proxy: (int(proxy['expire']) - tools.now_ts()) < 30, proxy_lst))
  190. for proxy in proxy_pool:
  191. self.drop(proxy)
  192. tools.delay(2)
  193. except Exception as e:
  194. logger.exception(e)
  195. def run(self):
  196. _join_lst = []
  197. _thread_lst = [
  198. Thread(target=self._create_pool, daemon=True, name='Create'),
  199. Thread(target=self._watch, daemon=True, name='Watch')
  200. ]
  201. for thread in _thread_lst:
  202. thread.start()
  203. _join_lst.append(thread)
  204. for thread in _join_lst:
  205. thread.join()
  206. class ProxyPoolClient(BaseProxyPool):
  207. def __init__(self, service_name, redis_label, scheme):
  208. """
  209. @param str service_name: 服务名称
  210. @param str redis_label: 服务名称
  211. @param str scheme: 协议名称
  212. """
  213. super().__init__(service_name, redis_label, scheme)
  214. self.tab_name = f'proxy:{self.scheme}:items'
  215. self.lst_name = f'proxy:{self.scheme}:pk_0'
  216. self.lst_name_1 = f'proxy:{self.scheme}:pk_1'
  217. self.lst_name_2 = f'proxy:{self.scheme}:pk_2'
  218. def _get_proxy(self, src, dst, timeout=3.0):
  219. return self.redis_db.brpoplpush(src, dst, timeout=timeout)
  220. def get_proxy(self):
  221. """
  222. 从Redis中获取一个代理IP
  223. """
  224. return self._get_proxy(self.lst_name, self.lst_name, timeout=0.5)
  225. def get_pk1_proxy(self):
  226. return self._get_proxy(self.lst_name_1, self.lst_name_1, timeout=0.5)
  227. def get_pk2_proxy(self):
  228. return self._get_proxy(self.lst_name_2, self.lst_name_2, timeout=0.5)
  229. def proxies(self, pk=0):
  230. if pk == 1:
  231. host = self.get_pk1_proxy()
  232. elif pk == 2:
  233. host = self.get_pk2_proxy()
  234. else:
  235. host = self.get_proxy()
  236. if not host:
  237. return
  238. args = (self.scheme, host)
  239. r = {'http': '{}://{}'.format(*args), 'https': '{}://{}'.format(*args)}
  240. return r
  241. def _get_all_proxy(self, pk=None):
  242. results = []
  243. proxy_dict = self.redis_db.hgetall(self.tab_name)
  244. if not proxy_dict:
  245. return results
  246. proxy_lst = [tools.json_loads(proxy) for proxy in proxy_dict.values()]
  247. if pk is not None:
  248. proxy_lst = list(filter(lambda p: p['pk'] == pk, proxy_lst))
  249. return proxy_lst
  250. def get_proxy_pool(self, **kwargs):
  251. results = []
  252. proxy_lst = self._get_all_proxy(**kwargs)
  253. for proxy in proxy_lst:
  254. life_time = proxy['expire']
  255. expire = life_time - tools.now_ts()
  256. results.append({
  257. 'proxies': proxy['proxies'],
  258. 'expire_time': tools.ts2dt(life_time),
  259. 'expire': expire,
  260. 'pk': proxy.get('pk', 1),
  261. })
  262. # 展示时按照过期时间从大到小排列
  263. return list(sorted(results, key=lambda x: x['expire'], reverse=True))
  264. def get_all_proxy_ip(self, protocol, **kwargs):
  265. return [
  266. proxy['proxies']['http'].replace(f'{protocol}://', '', 1)
  267. for proxy in self._get_all_proxy(**kwargs)
  268. ]