proxy_pool.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759
  1. # -*- coding: utf-8 -*-
  2. """
  3. 代理池
  4. """
  5. import datetime
  6. import json
  7. import os
  8. import random
  9. import socket
  10. import time
  11. from urllib import parse
  12. import redis
  13. import requests
  14. from feapder import setting
  15. from feapder.utils import tools
  16. from feapder.utils.log import log
  17. def decrypt(input_str: str) -> str:
  18. """
  19. 改写:新增
  20. 定义base64解密函数
  21. :param input_str:
  22. :return:
  23. """
  24. key = "ABNOPqrceQRSTklmUDEFGXYZabnopfghHVWdijstuvwCIJKLMxyz0123456789+/"
  25. ascii_list = ['{:0>6}'.format(str(bin(key.index(i))).replace('0b', '')) for i in input_str if i != '=']
  26. output_str = ''
  27. # 对前面不是“=”的字节取索引,然后转换为2进制
  28. # 补齐“=”的个数
  29. equal_num = input_str.count('=')
  30. while ascii_list:
  31. temp_list = ascii_list[:4]
  32. # 转换成2进制字符串
  33. temp_str = ''.join(temp_list)
  34. # 对没有8位2进制的字符串补够8位2进制
  35. if len(temp_str) % 8 != 0:
  36. temp_str = temp_str[0:-1 * equal_num * 2]
  37. # 4个6字节的二进制 转换 为三个8字节的二进制
  38. temp_str_list = [temp_str[x:x + 8] for x in [0, 8, 16]]
  39. # 二进制转为10进制
  40. temp_str_list = [int(x, 2) for x in temp_str_list if x]
  41. # 连接成字符串
  42. output_str += ''.join([chr(x) for x in temp_str_list])
  43. ascii_list = ascii_list[4:]
  44. return output_str
  45. # 建立本地缓存代理文件夹
  46. proxy_path = os.path.join(os.path.dirname(__file__), "proxy_file")
  47. if not os.path.exists(proxy_path):
  48. os.mkdir(proxy_path)
  49. # def get_proxies_by_host(host, port):
  50. # proxy_id = "{}:{}".format(host, port)
  51. # return get_proxies_by_id(proxy_id)
  52. # def get_proxies_by_id(proxy_id):
  53. # proxies = {
  54. # "http": "http://{}".format(proxy_id),
  55. # "https": "https://{}".format(proxy_id),
  56. # }
  57. # return proxies
  58. def get_proxy_from_url(**kwargs):
  59. """
  60. 获取指定url的代理
  61. :param kwargs:
  62. :return:
  63. """
  64. proxy_source_url = kwargs.get("proxy_source_url", [])
  65. # proxy_source_url = "http://socks.spdata.jianyu360.com/socks/getips?limit=100"
  66. if not isinstance(proxy_source_url, list):
  67. proxy_source_url = [proxy_source_url]
  68. proxy_source_url = [x for x in proxy_source_url if x]
  69. if not proxy_source_url:
  70. raise ValueError("no specify proxy_source_url: {}".format(proxy_source_url))
  71. kwargs = kwargs.copy()
  72. kwargs.pop("proxy_source_url")
  73. proxies_list = []
  74. for url in proxy_source_url:
  75. if url.startswith("http"):
  76. proxies_list.extend(get_proxy_from_http(url, **kwargs))
  77. elif url.startswith("redis"):
  78. proxies_list.extend(get_proxy_from_redis(url, **kwargs))
  79. if proxies_list:
  80. # 顺序打乱
  81. random.shuffle(proxies_list)
  82. return proxies_list
  83. def get_proxy_from_http(proxy_source_url, **kwargs):
  84. """
  85. 从指定 http 地址获取代理
  86. :param proxy_source_url:
  87. :param kwargs:
  88. :return:
  89. """
  90. filename = tools.get_md5(proxy_source_url) + ".txt"
  91. abs_filename = os.path.join(proxy_path, filename)
  92. update_interval = kwargs.get("local_proxy_file_cache_timeout", 30)
  93. update_flag = 0
  94. if not update_interval:
  95. # 强制更新
  96. update_flag = 1
  97. elif not os.path.exists(abs_filename):
  98. # 文件不存在则更新
  99. update_flag = 1
  100. elif time.time() - os.stat(abs_filename).st_mtime > update_interval:
  101. # 超过更新间隔
  102. update_flag = 1
  103. if update_flag:
  104. pool = []
  105. response = requests.get(proxy_source_url, timeout=20)
  106. # 改写:获取scocks代理的response处理
  107. for proxy in response.json():
  108. host = decrypt(proxy['ip'])
  109. port = proxy['ports'][0]
  110. endTime = proxy['lifetime']
  111. pool.append(f"{host}:{port}&&{endTime}")
  112. with open(os.path.join(proxy_path, filename), "w") as f:
  113. f.write('\n'.join(pool))
  114. return get_proxy_from_file(filename)
  115. def get_proxy_from_file(filename, **kwargs):
  116. """
  117. 从指定本地文件获取代理
  118. 文件格式
  119. ip:port:https
  120. ip:port:http
  121. ip:port
  122. :param filename:
  123. :param kwargs:
  124. :return:
  125. """
  126. proxies_list = []
  127. with open(os.path.join(proxy_path, filename), "r") as f:
  128. lines = f.readlines()
  129. for line in lines:
  130. line = line.strip()
  131. if not line:
  132. continue
  133. # 解析
  134. auth = ""
  135. if "@" in line:
  136. auth, line = line.split("@")
  137. # 改写,解析代理有效期结束时间
  138. line, end = line.split("&&")
  139. items = line.split(":")
  140. if len(items) < 2:
  141. continue
  142. ip, port, *protocol = items
  143. if not all([port, ip]):
  144. continue
  145. if auth:
  146. ip = "{}@{}".format(auth, ip)
  147. if not protocol:
  148. # 改写:判断代理是否在有效期内,并将代理格式重http格式改成socks格式
  149. if time.time() < int(end):
  150. proxies = {
  151. "https": "socks5://%s:%s" % (ip, port),
  152. "http": "socks5://%s:%s" % (ip, port),
  153. # "end":end
  154. }
  155. else:
  156. continue
  157. else:
  158. proxies = {protocol[0]: "%s://%s:%s" % (protocol[0], ip, port)}
  159. proxies_list.append(proxies)
  160. return proxies_list
  161. def get_proxy_from_redis(proxy_source_url, **kwargs):
  162. """
  163. 从指定 redis 地址获取代理
  164. @param proxy_source_url: redis://:passwd@host:ip/db
  165. redis 存储结构 zset
  166. ip:port ts
  167. @param kwargs:
  168. {"redis_proxies_key": "xxx"}
  169. @return: [{'http':'http://xxx.xxx.xxx:xxx', 'https':'https://xxx.xxx.xxx.xxx:xxx'}]
  170. """
  171. redis_conn = redis.StrictRedis.from_url(proxy_source_url)
  172. key = kwargs.get("redis_proxies_key")
  173. assert key, "从redis中获取代理 需要指定 redis_proxies_key"
  174. proxies = redis_conn.zrange(key, 0, -1)
  175. proxies_list = []
  176. for proxy in proxies:
  177. proxy = proxy.decode()
  178. proxies_list.append(
  179. {"https": "https://%s" % proxy, "http": "http://%s" % proxy}
  180. )
  181. return proxies_list
  182. def check_proxy(
  183. ip="",
  184. port="",
  185. proxies=None,
  186. type=0,
  187. timeout=5,
  188. logger=None,
  189. show_error_log=True,
  190. **kwargs,
  191. ):
  192. """
  193. 代理有效性检查
  194. :param ip:
  195. :param port:
  196. :param type: 0:socket 1:requests
  197. :param timeout:
  198. :param logger:
  199. :return:
  200. """
  201. if not logger:
  202. logger = log
  203. ok = 0
  204. if type == 0 and ip and port:
  205. # socket检测成功 不代表代理一定可用 Connection closed by foreign host. 这种情况就不行
  206. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sk:
  207. sk.settimeout(timeout)
  208. try:
  209. # 必须检测 否则代理永远不刷新
  210. sk.connect((ip, int(port)))
  211. ok = 1
  212. except Exception as e:
  213. if show_error_log:
  214. logger.debug("check proxy failed: {} {}:{}".format(e, ip, port))
  215. sk.close()
  216. else:
  217. if not proxies:
  218. proxies = {
  219. "http": "socks5://{}:{}".format(ip, port),
  220. "https": "socks5//{}:{}".format(ip, port),
  221. }
  222. try:
  223. # 改写:代理检测的url
  224. r = requests.get(
  225. "https://myip.ipip.net", proxies=proxies, timeout=timeout, stream=True
  226. )
  227. ok = 1
  228. r.close()
  229. except Exception as e:
  230. if show_error_log:
  231. logger.debug(
  232. "check proxy failed: {} {}:{} {}".format(e, ip, port, proxies)
  233. )
  234. return ok
  235. class ProxyItem(object):
  236. """单个代理对象"""
  237. # 代理标记
  238. proxy_tag_list = (-1, 0, 1)
  239. def __init__(
  240. self,
  241. proxies=None,
  242. valid_timeout=20,
  243. check_interval=180,
  244. max_proxy_use_num=10000,
  245. delay=30,
  246. use_interval=None,
  247. **kwargs,
  248. ):
  249. """
  250. :param proxies:
  251. :param valid_timeout: 代理检测超时时间 默认-1 20181008 默认不再监测有效性
  252. :param check_interval:
  253. :param max_proxy_use_num:
  254. :param delay:
  255. :param use_interval: 使用间隔 单位秒 默认不限制
  256. :param logger: 日志处理器 默认 log.get_logger()
  257. :param kwargs:
  258. """
  259. # {"http": ..., "https": ...}
  260. self.proxies = proxies
  261. # 检测超时时间 秒
  262. self.valid_timeout = valid_timeout
  263. # 检测间隔 秒
  264. self.check_interval = check_interval
  265. # 标记 0:正常 -1:丢弃 1: 待会再用 ...
  266. self.flag = 0
  267. # 上次状态变化时间
  268. self.flag_ts = 0
  269. # 上次更新时间 有效时间
  270. self.update_ts = 0
  271. # 最大被使用次数
  272. self.max_proxy_use_num = max_proxy_use_num
  273. # 被使用次数记录
  274. self.use_num = 0
  275. # 延迟使用时间
  276. self.delay = delay
  277. # 使用间隔 单位秒
  278. self.use_interval = use_interval
  279. # 使用时间
  280. self.use_ts = 0
  281. self.proxy_args = self.parse_proxies(self.proxies)
  282. self.proxy_ip = self.proxy_args["ip"]
  283. self.proxy_port = self.proxy_args["port"]
  284. self.proxy_ip_port = "{}:{}".format(self.proxy_ip, self.proxy_port)
  285. if self.proxy_args["user"]:
  286. self.proxy_id = "{user}:{password}@{ip}:{port}".format(**self.proxy_args)
  287. else:
  288. self.proxy_id = self.proxy_ip_port
  289. # 日志处理器
  290. self.logger = log
  291. def get_proxies(self):
  292. self.use_num += 1
  293. return self.proxies
  294. def is_delay(self):
  295. return self.flag == 1
  296. def is_valid(self, force=0, type=0):
  297. """
  298. 检测代理是否有效
  299. 1 有效
  300. 2 延时使用
  301. 0 无效 直接在代理池删除
  302. :param force:
  303. :param type:
  304. :return:
  305. """
  306. if self.use_num > self.max_proxy_use_num > 0:
  307. self.logger.debug("代理达到最大使用次数: {} {}".format(self.use_num, self.proxies))
  308. return 0
  309. if self.flag == -1:
  310. self.logger.debug("代理被标记 -1 丢弃 %s" % self.proxies)
  311. return 0
  312. if self.delay > 0 and self.flag == 1:
  313. if time.time() - self.flag_ts < self.delay:
  314. self.logger.debug("代理被标记 1 延迟 %s" % self.proxies)
  315. return 2
  316. else:
  317. self.flag = 0
  318. self.logger.debug("延迟代理释放: {}".format(self.proxies))
  319. if self.use_interval:
  320. if time.time() - self.use_ts < self.use_interval:
  321. return 2
  322. if not force:
  323. if time.time() - self.update_ts < self.check_interval:
  324. return 1
  325. if self.valid_timeout > 0:
  326. ok = check_proxy(
  327. proxies=self.proxies,
  328. type=type,
  329. timeout=self.valid_timeout,
  330. logger=self.logger,
  331. )
  332. else:
  333. ok = 1
  334. self.update_ts = time.time()
  335. return ok
  336. @classmethod
  337. def parse_proxies(self, proxies):
  338. """
  339. 分解代理组成部分
  340. :param proxies:
  341. :return:
  342. """
  343. if not proxies:
  344. return {}
  345. if isinstance(proxies, (str, bytes)):
  346. proxies = json.loads(proxies)
  347. protocol = list(proxies.keys())
  348. if not protocol:
  349. return {}
  350. _url = proxies.get(protocol[0])
  351. # 改写:注释http代理url的拼接,以正常生成代理池
  352. # if not _url.startswith("http"):
  353. # _url = "http://" + _url
  354. _url_parse = parse.urlparse(_url)
  355. netloc = _url_parse.netloc
  356. if "@" in netloc:
  357. netloc_auth, netloc_host = netloc.split("@")
  358. else:
  359. netloc_auth, netloc_host = "", netloc
  360. ip, *port = netloc_host.split(":")
  361. port = port[0] if port else "80"
  362. user, *password = netloc_auth.split(":")
  363. password = password[0] if password else ""
  364. return {
  365. "protocol": protocol,
  366. "ip": ip,
  367. "port": port,
  368. "user": user,
  369. "password": password,
  370. "ip_port": "{}:{}".format(ip, port),
  371. }
  372. class ProxyPoolBase(object):
  373. def __init__(self, *args, **kwargs):
  374. pass
  375. def get(self, *args, **kwargs):
  376. raise NotImplementedError
  377. class ProxyPool(ProxyPoolBase):
  378. """代理池"""
  379. def __init__(self, **kwargs):
  380. """
  381. :param size: 代理池大小 -1 为不限制
  382. :param proxy_source_url: 代理文件地址 支持列表
  383. :param proxy_instance: 提供代理的实例
  384. :param reset_interval: 代理池重置间隔 最小间隔
  385. :param reset_interval_max: 代理池重置间隔 最大间隔 默认2分钟
  386. :param check_valid: 是否在获取代理时进行检测有效性
  387. :param local_proxy_file_cache_timeout: 本地缓存的代理文件超时时间
  388. :param logger: 日志处理器 默认 log.get_logger()
  389. :param kwargs: 其他的参数
  390. """
  391. kwargs.setdefault("size", -1)
  392. kwargs.setdefault("proxy_source_url", setting.PROXY_EXTRACT_API)
  393. super(ProxyPool, self).__init__(**kwargs)
  394. # 队列最大长度
  395. self.max_queue_size = kwargs.get("size", -1)
  396. # 实际代理数量
  397. self.real_max_proxy_count = 1000
  398. # 代理可用最大次数
  399. # 代理获取地址 http://localhost/proxy.txt
  400. self.proxy_source_url = kwargs.get("proxy_source_url", [])
  401. if not isinstance(self.proxy_source_url, list):
  402. self.proxy_source_url = [self.proxy_source_url]
  403. self.proxy_source_url = [x for x in self.proxy_source_url if x]
  404. self.proxy_source_url = list(set(self.proxy_source_url))
  405. kwargs.update({"proxy_source_url": self.proxy_source_url})
  406. # 处理日志
  407. self.logger = kwargs.get("logger") or log
  408. kwargs["logger"] = self.logger
  409. if not self.proxy_source_url:
  410. self.logger.warn("need set proxy_source_url or proxy_instance")
  411. # 代理池重置间隔
  412. self.reset_interval = kwargs.get("reset_interval", 5)
  413. # 强制重置一下代理 添加新的代理进来 防止一直使用旧的被封的代理
  414. self.reset_interval_max = kwargs.get("reset_interval_max", 180)
  415. # 是否监测代理有效性
  416. self.check_valid = kwargs.get("check_valid", True)
  417. # 代理队列
  418. self.proxy_queue = None
  419. # {代理id: ProxyItem, ...}
  420. self.proxy_dict = {}
  421. # 失效代理队列
  422. self.invalid_proxy_dict = {}
  423. self.kwargs = kwargs
  424. # 重置代理池锁
  425. self.reset_lock = None
  426. # 重置时间
  427. self.last_reset_time = 0
  428. # 重置的太快了 计数
  429. self.reset_fast_count = 0
  430. # 计数 获取代理重试3次仍然失败 次数
  431. self.no_valid_proxy_times = 0
  432. # 上次获取代理时间
  433. self.last_get_ts = time.time()
  434. # 记录ProxyItem的update_ts 防止由于重置太快导致重复检测有效性
  435. self.proxy_item_update_ts_dict = {}
  436. # 警告
  437. self.warn_flag = False
  438. def warn(self):
  439. if not self.warn_flag:
  440. for url in self.proxy_source_url:
  441. if "zhima" in url:
  442. continue
  443. self.warn_flag = True
  444. return
  445. @property
  446. def queue_size(self):
  447. """
  448. 当前代理池中代理数量
  449. :return:
  450. """
  451. return self.proxy_queue.qsize() if self.proxy_queue is not None else 0
  452. def clear(self):
  453. """
  454. 清空自己
  455. :return:
  456. """
  457. self.proxy_queue = None
  458. # {代理ip: ProxyItem, ...}
  459. self.proxy_dict = {}
  460. # 清理失效代理集合
  461. _limit = datetime.datetime.now() - datetime.timedelta(minutes=10)
  462. self.invalid_proxy_dict = {
  463. k: v for k, v in self.invalid_proxy_dict.items() if v > _limit
  464. }
  465. # 清理超时的update_ts记录
  466. _limit = time.time() - 600
  467. self.proxy_item_update_ts_dict = {
  468. k: v for k, v in self.proxy_item_update_ts_dict.items() if v > _limit
  469. }
  470. return
  471. def get(self, retry: int = 0) -> dict:
  472. """
  473. 从代理池中获取代理
  474. :param retry:
  475. :return:
  476. """
  477. retry += 1
  478. if retry > 3:
  479. self.no_valid_proxy_times += 1
  480. return None
  481. # if time.time() - self.last_get_ts > 3 * 60:
  482. # # 3分钟没有获取过 重置一下
  483. # try:
  484. # self.reset_proxy_pool()
  485. # except Exception as e:
  486. # self.logger.exception(e)
  487. # 记录获取时间
  488. self.last_get_ts = time.time()
  489. #
  490. self.warn()
  491. proxy_item = self.get_random_proxy()
  492. if proxy_item:
  493. # 不检测
  494. if not self.check_valid: #
  495. # 塞回去
  496. proxies = proxy_item.get_proxies()
  497. self.put_proxy_item(proxy_item)
  498. return proxies
  499. else:
  500. is_valid = proxy_item.is_valid()
  501. if is_valid:
  502. # 记录update_ts
  503. self.proxy_item_update_ts_dict[
  504. proxy_item.proxy_id
  505. ] = proxy_item.update_ts
  506. # 塞回去
  507. proxies = proxy_item.get_proxies()
  508. self.put_proxy_item(proxy_item)
  509. if is_valid == 1:
  510. if proxy_item.use_interval:
  511. proxy_item.use_ts = time.time()
  512. return proxies
  513. else:
  514. # 处理失效代理
  515. self.proxy_dict.pop(proxy_item.proxy_id, "")
  516. self.invalid_proxy_dict[
  517. proxy_item.proxy_id
  518. ] = datetime.datetime.now()
  519. else:
  520. try:
  521. time.sleep(3)
  522. self.reset_proxy_pool()
  523. except Exception as e:
  524. self.logger.exception(e)
  525. if self.no_valid_proxy_times >= 5:
  526. # 解决bug: 当爬虫仅剩一个任务时 由于只有一个线程检测代理 而不可用代理又刚好很多(时间越长越多) 可能出现一直获取不到代理的情况
  527. # 导致爬虫烂尾
  528. try:
  529. time.sleep(3)
  530. self.reset_proxy_pool()
  531. except Exception as e:
  532. self.logger.exception(e)
  533. return self.get(retry)
  534. get_proxy = get
  535. def get_random_proxy(self) -> ProxyItem:
  536. """
  537. 随机获取代理
  538. :return:
  539. """
  540. if self.proxy_queue is not None:
  541. if random.random() < 0.5:
  542. # 一半概率检查 这是个高频操作 优化一下
  543. if time.time() - self.last_reset_time > self.reset_interval_max:
  544. time.sleep(3)
  545. self.reset_proxy_pool(force=True)
  546. else:
  547. min_q_size = (
  548. min(self.max_queue_size / 2, self.real_max_proxy_count / 2)
  549. if self.max_queue_size > 0
  550. else self.real_max_proxy_count / 2
  551. )
  552. if self.proxy_queue.qsize() < min_q_size:
  553. time.sleep(3)
  554. self.reset_proxy_pool()
  555. try:
  556. return self.proxy_queue.get_nowait()
  557. except Exception:
  558. pass
  559. return None
  560. def append_proxies(self, proxies_list: list) -> int:
  561. """
  562. 添加代理到代理池
  563. :param proxies_list:
  564. :return:
  565. """
  566. count = 0
  567. if not isinstance(proxies_list, list):
  568. proxies_list = [proxies_list]
  569. for proxies in proxies_list:
  570. if proxies:
  571. proxy_item = ProxyItem(proxies=proxies, **self.kwargs)
  572. # 增加失效判断 2018/12/18
  573. if proxy_item.proxy_id in self.invalid_proxy_dict:
  574. continue
  575. if proxy_item.proxy_id not in self.proxy_dict:
  576. # 补充update_ts
  577. if not proxy_item.update_ts:
  578. proxy_item.update_ts = self.proxy_item_update_ts_dict.get(
  579. proxy_item.proxy_id, 0
  580. )
  581. self.put_proxy_item(proxy_item)
  582. self.proxy_dict[proxy_item.proxy_id] = proxy_item
  583. count += 1
  584. return count
  585. def put_proxy_item(self, proxy_item: ProxyItem):
  586. """
  587. 添加 ProxyItem 到代理池
  588. :param proxy_item:
  589. :return:
  590. """
  591. return self.proxy_queue.put_nowait(proxy_item)
  592. def reset_proxy_pool(self, force: bool = False):
  593. """
  594. 重置代理池
  595. :param force: 是否强制重置代理池
  596. :return:
  597. """
  598. if not self.reset_lock:
  599. # 必须用时调用 否则 可能存在 gevent patch前 threading就已经被导入 导致的Rlock patch失效
  600. import threading
  601. self.reset_lock = threading.RLock()
  602. with self.reset_lock:
  603. if (
  604. force
  605. or self.proxy_queue is None
  606. or (
  607. self.max_queue_size > 0
  608. and self.proxy_queue.qsize() < self.max_queue_size / 2
  609. )
  610. or (
  611. self.max_queue_size < 0
  612. and self.proxy_queue.qsize() < self.real_max_proxy_count / 2
  613. )
  614. or self.no_valid_proxy_times >= 5
  615. ):
  616. if time.time() - self.last_reset_time < self.reset_interval:
  617. self.reset_fast_count += 1
  618. if self.reset_fast_count % 10 == 0:
  619. self.logger.debug(
  620. "代理池重置的太快了:) {}".format(self.reset_fast_count)
  621. )
  622. time.sleep(1)
  623. else:
  624. self.clear()
  625. if self.proxy_queue is None:
  626. import queue
  627. self.proxy_queue = queue.Queue()
  628. # TODO 这里获取到的可能重复
  629. proxies_list = get_proxy_from_url(**self.kwargs)
  630. self.real_max_proxy_count = len(proxies_list)
  631. if 0 < self.max_queue_size < self.real_max_proxy_count:
  632. proxies_list = random.sample(proxies_list, self.max_queue_size)
  633. _valid_count = self.append_proxies(proxies_list)
  634. self.last_reset_time = time.time()
  635. self.no_valid_proxy_times = 0
  636. self.logger.debug(
  637. "重置代理池成功: 获取{}, 成功添加{}, 失效{}, 当前代理数{},".format(
  638. len(proxies_list),
  639. _valid_count,
  640. len(self.invalid_proxy_dict),
  641. len(self.proxy_dict),
  642. )
  643. )
  644. return
  645. def tag_proxy(self, proxies_list: list, flag: int, *, delay=30) -> bool:
  646. """
  647. 对代理进行标记
  648. :param proxies_list:
  649. :param flag:
  650. -1 废弃
  651. 1 延迟使用
  652. :param delay: 延迟时间
  653. :return:
  654. """
  655. if int(flag) not in ProxyItem.proxy_tag_list or not proxies_list:
  656. return False
  657. if not isinstance(proxies_list, list):
  658. proxies_list = [proxies_list]
  659. for proxies in proxies_list:
  660. if not proxies:
  661. continue
  662. proxy_id = ProxyItem(proxies).proxy_id
  663. if proxy_id not in self.proxy_dict:
  664. continue
  665. self.proxy_dict[proxy_id].flag = flag
  666. self.proxy_dict[proxy_id].flag_ts = time.time()
  667. self.proxy_dict[proxy_id].delay = delay
  668. return True
  669. def get_proxy_item(self, proxy_id="", proxies=None):
  670. """
  671. 获取代理对象
  672. :param proxy_id:
  673. :param proxies:
  674. :return:
  675. """
  676. if proxy_id:
  677. return self.proxy_dict.get(proxy_id)
  678. if proxies:
  679. proxy_id = ProxyItem(proxies).proxy_id
  680. return self.proxy_dict.get(proxy_id)
  681. return
  682. def copy(self):
  683. return ProxyPool(**self.kwargs)
  684. def all(self) -> list:
  685. """
  686. 获取当前代理池中的全部代理
  687. :return:
  688. """
  689. return get_proxy_from_url(**self.kwargs)