proxy_pool.py 23 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. 代理池
  4. """
  5. import datetime
  6. import json
  7. import os
  8. import random
  9. import socket
  10. import time
  11. from urllib import parse
  12. import redis
  13. import requests
  14. from feapder import setting
  15. from feapder.utils import tools
  16. from feapder.utils.log import log as logger
  17. # 建立本地缓存代理文件夹
  18. proxy_path = os.path.join(os.path.dirname(__file__), "proxy_file")
  19. if not os.path.exists(proxy_path):
  20. os.mkdir(proxy_path)
  21. def get_proxy_from_jyapi(timeout=5, default=None, show_error_log=False):
  22. """
  23. 剑鱼代理
  24. @param timeout: 访问超时时间
  25. @param default: 默认返回值
  26. @param show_error_log: 展示错误堆栈信息日志
  27. """
  28. request_params = dict(
  29. headers=dict(Authorization=setting.JY_PROXY_AUTHOR),
  30. timeout=timeout
  31. )
  32. try:
  33. response = requests.get(setting.JY_PROXY_URL, **request_params)
  34. except requests.exceptions.RequestException as why:
  35. if show_error_log:
  36. logger.exception(why)
  37. return default
  38. try:
  39. proxies = response.json()["data"]
  40. return proxies
  41. except KeyError:
  42. pass
  43. return default
  44. def get_proxy_from_url(**kwargs):
  45. """
  46. 获取指定url的代理
  47. :param kwargs:
  48. :return:
  49. """
  50. proxy_source_url = kwargs.get("proxy_source_url", [])
  51. # proxy_source_url = "http://socks.spdata.jianyu360.com/socks/getips?limit=100"
  52. if not isinstance(proxy_source_url, list):
  53. proxy_source_url = [proxy_source_url]
  54. proxy_source_url = [x for x in proxy_source_url if x]
  55. if not proxy_source_url:
  56. raise ValueError("no specify proxy_source_url: {}".format(proxy_source_url))
  57. kwargs = kwargs.copy()
  58. kwargs.pop("proxy_source_url")
  59. proxies_list = []
  60. for url in proxy_source_url:
  61. if url.startswith("http"):
  62. proxies_list.extend(get_proxy_from_http(url, **kwargs))
  63. elif url.startswith("redis"):
  64. proxies_list.extend(get_proxy_from_redis(url, **kwargs))
  65. if proxies_list:
  66. # 顺序打乱
  67. random.shuffle(proxies_list)
  68. return proxies_list
  69. def get_proxy_from_http(proxy_source_url, **kwargs):
  70. """
  71. 从指定 http 地址获取代理
  72. :param proxy_source_url:
  73. :param kwargs:
  74. :return:
  75. """
  76. filename = tools.get_md5(proxy_source_url) + ".txt"
  77. abs_filename = os.path.join(proxy_path, filename)
  78. update_interval = kwargs.get("local_proxy_file_cache_timeout", 30)
  79. update_flag = 0
  80. if not update_interval:
  81. # 强制更新
  82. update_flag = 1
  83. elif not os.path.exists(abs_filename):
  84. # 文件不存在则更新
  85. update_flag = 1
  86. elif time.time() - os.stat(abs_filename).st_mtime > update_interval:
  87. # 超过更新间隔
  88. update_flag = 1
  89. if update_flag:
  90. pool = []
  91. response = requests.get(proxy_source_url, timeout=20)
  92. # 改写:获取scocks代理的response处理
  93. for proxy in response.json():
  94. host = tools.decrypt(proxy["ip"])
  95. port = proxy["ports"][0]
  96. endTime = proxy["lifetime"]
  97. pool.append(f"{host}:{port}&&{endTime}")
  98. with open(os.path.join(proxy_path, filename), "w") as f:
  99. f.write("\n".join(pool))
  100. return get_proxy_from_file(filename)
  101. def get_proxy_from_file(filename):
  102. """
  103. 从指定本地文件获取代理
  104. 文件格式
  105. ip:port:https
  106. ip:port:http
  107. ip:port
  108. :param filename:
  109. :return:
  110. """
  111. proxies_list = []
  112. with open(os.path.join(proxy_path, filename), "r") as f:
  113. lines = f.readlines()
  114. for line in lines:
  115. line = line.strip()
  116. if not line:
  117. continue
  118. # 解析
  119. auth = ""
  120. if "@" in line:
  121. auth, line = line.split("@")
  122. # 改写,解析代理有效期结束时间
  123. line, end = line.split("&&")
  124. items = line.split(":")
  125. if len(items) < 2:
  126. continue
  127. ip, port, *protocol = items
  128. if not all([port, ip]):
  129. continue
  130. if auth:
  131. ip = "{}@{}".format(auth, ip)
  132. if not protocol:
  133. # 改写:判断代理是否在有效期内,并将代理格式重http格式改成socks格式
  134. if time.time() < int(end):
  135. proxies = {
  136. "https": "socks5://%s:%s" % (ip, port),
  137. "http": "socks5://%s:%s" % (ip, port),
  138. # "end":end
  139. }
  140. else:
  141. continue
  142. else:
  143. proxies = {protocol[0]: "%s://%s:%s" % (protocol[0], ip, port)}
  144. proxies_list.append(proxies)
  145. return proxies_list
  146. def get_proxy_from_redis(proxy_source_url, **kwargs):
  147. """
  148. 从指定 redis 地址获取代理
  149. @param proxy_source_url: redis://:passwd@host:ip/db
  150. redis 存储结构 zset
  151. ip:port ts
  152. @param kwargs:
  153. {"redis_proxies_key": "xxx"}
  154. @return: [{'http':'http://xxx.xxx.xxx:xxx', 'https':'https://xxx.xxx.xxx.xxx:xxx'}]
  155. """
  156. redis_conn = redis.StrictRedis.from_url(proxy_source_url)
  157. key = kwargs.get("redis_proxies_key")
  158. assert key, "从redis中获取代理 需要指定 redis_proxies_key"
  159. proxies = redis_conn.zrange(key, 0, -1)
  160. proxies_list = []
  161. for proxy in proxies:
  162. proxy = proxy.decode()
  163. proxies_list.append(
  164. {"https": "https://%s" % proxy, "http": "http://%s" % proxy}
  165. )
  166. return proxies_list
  167. def check_proxy(ip="", port="", proxies=None, type=0, timeout=5, show_error_log=True):
  168. """
  169. 代理有效性检查
  170. :param ip:
  171. :param port:
  172. :param proxies:
  173. :param type: 0:socket 1:requests
  174. :param timeout:
  175. :param show_error_log:
  176. :return:
  177. """
  178. ok = 0
  179. if type == 0 and ip and port:
  180. # socket检测成功 不代表代理一定可用 Connection closed by foreign host. 这种情况就不行
  181. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sk:
  182. sk.settimeout(timeout)
  183. try:
  184. # 必须检测 否则代理永远不刷新
  185. sk.connect((ip, int(port)))
  186. ok = 1
  187. except Exception as e:
  188. if show_error_log:
  189. logger.debug("check proxy failed: {} {}:{}".format(e, ip, port))
  190. sk.close()
  191. else:
  192. if not proxies:
  193. proxies = {
  194. "http": "socks5://{}:{}".format(ip, port),
  195. "https": "socks5//{}:{}".format(ip, port),
  196. }
  197. try:
  198. r = requests.get("https://myip.ipip.net",
  199. proxies=proxies,
  200. timeout=timeout,
  201. stream=True)
  202. ok = 1
  203. r.close()
  204. except Exception as e:
  205. if show_error_log:
  206. args = (e, ip, port, proxies)
  207. logger.debug("check proxy failed: {} {}:{} {}".format(*args))
  208. return ok
  209. class ProxyItem(object):
  210. """单个代理对象"""
  211. # 代理标记
  212. proxy_tag_list = (-1, 0, 1)
  213. def __init__(
  214. self,
  215. proxies=None,
  216. valid_timeout=20,
  217. check_interval=180,
  218. max_proxy_use_num=10000,
  219. delay=30,
  220. use_interval=None,
  221. **kwargs,
  222. ):
  223. """
  224. :param proxies:
  225. :param valid_timeout: 代理检测超时时间 默认-1 20181008 默认不再监测有效性
  226. :param check_interval:
  227. :param max_proxy_use_num:
  228. :param delay:
  229. :param use_interval: 使用间隔 单位秒 默认不限制
  230. :param kwargs:
  231. """
  232. # {"http": ..., "https": ...}
  233. self.proxies = proxies
  234. # 检测超时时间 秒
  235. self.valid_timeout = valid_timeout
  236. # 检测间隔 秒
  237. self.check_interval = check_interval
  238. # 标记 0:正常 -1:丢弃 1: 待会再用 ...
  239. self.flag = 0
  240. # 上次状态变化时间
  241. self.flag_ts = 0
  242. # 上次更新时间 有效时间
  243. self.update_ts = 0
  244. # 最大被使用次数
  245. self.max_proxy_use_num = max_proxy_use_num
  246. # 被使用次数记录
  247. self.use_num = 0
  248. # 延迟使用时间
  249. self.delay = delay
  250. # 使用间隔 单位秒
  251. self.use_interval = use_interval
  252. # 使用时间
  253. self.use_ts = 0
  254. self.proxy_args = self.parse_proxies(self.proxies)
  255. self.proxy_ip = self.proxy_args["ip"]
  256. self.proxy_port = self.proxy_args["port"]
  257. self.proxy_ip_port = "{}:{}".format(self.proxy_ip, self.proxy_port)
  258. if self.proxy_args["user"]:
  259. self.proxy_id = "{user}:{password}@{ip}:{port}".format(**self.proxy_args)
  260. else:
  261. self.proxy_id = self.proxy_ip_port
  262. def get_proxies(self):
  263. self.use_num += 1
  264. return self.proxies
  265. def is_delay(self):
  266. return self.flag == 1
  267. def is_valid(self, force=0, type=0):
  268. """
  269. 检测代理是否有效
  270. 1 有效
  271. 2 延时使用
  272. 0 无效 直接在代理池删除
  273. :param force:
  274. :param type:
  275. :return:
  276. """
  277. if self.use_num > self.max_proxy_use_num > 0:
  278. logger.debug("代理达到最大使用次数: {} {}".format(self.use_num, self.proxies))
  279. return 0
  280. if self.flag == -1:
  281. logger.debug("代理被标记 -1 丢弃 %s" % self.proxies)
  282. return 0
  283. if self.delay > 0 and self.flag == 1:
  284. if time.time() - self.flag_ts < self.delay:
  285. logger.debug("代理被标记 1 延迟 %s" % self.proxies)
  286. return 2
  287. else:
  288. self.flag = 0
  289. logger.debug("延迟代理释放: {}".format(self.proxies))
  290. if self.use_interval:
  291. if time.time() - self.use_ts < self.use_interval:
  292. return 2
  293. if not force:
  294. if time.time() - self.update_ts < self.check_interval:
  295. return 1
  296. if self.valid_timeout > 0:
  297. ok = check_proxy(
  298. proxies=self.proxies,
  299. type=type,
  300. timeout=self.valid_timeout,
  301. )
  302. else:
  303. ok = 1
  304. self.update_ts = time.time()
  305. return ok
  306. @staticmethod
  307. def parse_proxies(proxies):
  308. """
  309. 分解代理组成部分
  310. :param proxies:
  311. :return:
  312. """
  313. if not proxies:
  314. return {}
  315. if isinstance(proxies, (str, bytes)):
  316. proxies = json.loads(proxies)
  317. protocol = list(proxies.keys())
  318. if not protocol:
  319. return {}
  320. _url = proxies.get(protocol[0])
  321. # 改写:注释http代理url的拼接,以正常生成代理池
  322. # if not _url.startswith("http"):
  323. # _url = "http://" + _url
  324. _url_parse = parse.urlparse(_url)
  325. netloc = _url_parse.netloc
  326. if "@" in netloc:
  327. netloc_auth, netloc_host = netloc.split("@")
  328. else:
  329. netloc_auth, netloc_host = "", netloc
  330. ip, *port = netloc_host.split(":")
  331. port = port[0] if port else "80"
  332. user, *password = netloc_auth.split(":")
  333. password = password[0] if password else ""
  334. return {
  335. "protocol": protocol,
  336. "ip": ip,
  337. "port": port,
  338. "user": user,
  339. "password": password,
  340. "ip_port": "{}:{}".format(ip, port),
  341. }
  342. class ProxyPoolBase(object):
  343. def __init__(self, *args, **kwargs):
  344. pass
  345. def get(self, *args, **kwargs):
  346. raise NotImplementedError
  347. class ProxyPool(ProxyPoolBase):
  348. """代理池"""
  349. def __init__(self, **kwargs):
  350. """
  351. :param size: 代理池大小 -1 为不限制
  352. :param proxy_source_url: 代理文件地址 支持列表
  353. :param proxy_instance: 提供代理的实例
  354. :param reset_interval: 代理池重置间隔 最小间隔
  355. :param reset_interval_max: 代理池重置间隔 最大间隔 默认2分钟
  356. :param check_valid: 是否在获取代理时进行检测有效性
  357. :param local_proxy_file_cache_timeout: 本地缓存的代理文件超时时间
  358. :param kwargs: 其他的参数
  359. """
  360. kwargs.setdefault("size", -1)
  361. kwargs.setdefault("proxy_source_url", setting.PROXY_EXTRACT_API)
  362. super(ProxyPool, self).__init__(**kwargs)
  363. # 队列最大长度
  364. self.max_queue_size = kwargs.get("size", -1)
  365. # 实际代理数量
  366. self.real_max_proxy_count = 1000
  367. # 代理可用最大次数
  368. # 代理获取地址 http://localhost/proxy.txt
  369. self.proxy_source_url = kwargs.get("proxy_source_url", [])
  370. if not isinstance(self.proxy_source_url, list):
  371. self.proxy_source_url = [self.proxy_source_url]
  372. self.proxy_source_url = [x for x in self.proxy_source_url if x]
  373. self.proxy_source_url = list(set(self.proxy_source_url))
  374. kwargs.update({"proxy_source_url": self.proxy_source_url})
  375. if not self.proxy_source_url:
  376. logger.warn("need set proxy_source_url or proxy_instance")
  377. # 代理池重置间隔
  378. self.reset_interval = kwargs.get("reset_interval", 5)
  379. # 强制重置一下代理 添加新的代理进来 防止一直使用旧的被封的代理
  380. self.reset_interval_max = kwargs.get("reset_interval_max", 180)
  381. # 是否监测代理有效性
  382. self.check_valid = kwargs.get("check_valid", True)
  383. # 代理队列
  384. self.proxy_queue = None
  385. # {代理id: ProxyItem, ...}
  386. self.proxy_dict = {}
  387. # 失效代理队列
  388. self.invalid_proxy_dict = {}
  389. self.kwargs = kwargs
  390. # 重置代理池锁
  391. self.reset_lock = None
  392. # 重置时间
  393. self.last_reset_time = 0
  394. # 重置的太快了 计数
  395. self.reset_fast_count = 0
  396. # 计数 获取代理重试3次仍然失败 次数
  397. self.no_valid_proxy_times = 0
  398. # 上次获取代理时间
  399. self.last_get_ts = time.time()
  400. # 记录ProxyItem的update_ts 防止由于重置太快导致重复检测有效性
  401. self.proxy_item_update_ts_dict = {}
  402. # 警告
  403. self.warn_flag = False
  404. def warn(self):
  405. if not self.warn_flag:
  406. for url in self.proxy_source_url:
  407. if "zhima" in url:
  408. continue
  409. self.warn_flag = True
  410. return
  411. @property
  412. def queue_size(self):
  413. """
  414. 当前代理池中代理数量
  415. :return:
  416. """
  417. return self.proxy_queue.qsize() if self.proxy_queue is not None else 0
  418. def clear(self):
  419. """
  420. 清空自己
  421. :return:
  422. """
  423. self.proxy_queue = None
  424. # {代理ip: ProxyItem, ...}
  425. self.proxy_dict = {}
  426. # 清理失效代理集合
  427. _limit = datetime.datetime.now() - datetime.timedelta(minutes=10)
  428. self.invalid_proxy_dict = {
  429. k: v for k, v in self.invalid_proxy_dict.items() if v > _limit
  430. }
  431. # 清理超时的update_ts记录
  432. _limit = time.time() - 600
  433. self.proxy_item_update_ts_dict = {
  434. k: v for k, v in self.proxy_item_update_ts_dict.items() if v > _limit
  435. }
  436. return
  437. def get(self, retry: int = 0) -> dict:
  438. """
  439. 从代理池中获取代理
  440. :param retry:
  441. :return:
  442. """
  443. retry += 1
  444. if retry > 3:
  445. self.no_valid_proxy_times += 1
  446. return None
  447. # if time.time() - self.last_get_ts > 3 * 60:
  448. # # 3分钟没有获取过 重置一下
  449. # try:
  450. # self.reset_proxy_pool()
  451. # except Exception as e:
  452. # logger.exception(e)
  453. # 记录获取时间
  454. self.last_get_ts = time.time()
  455. #
  456. self.warn()
  457. proxy_item = self.get_random_proxy()
  458. if proxy_item:
  459. # 不检测
  460. if not self.check_valid: #
  461. # 塞回去
  462. proxies = proxy_item.get_proxies()
  463. self.put_proxy_item(proxy_item)
  464. return proxies
  465. else:
  466. is_valid = proxy_item.is_valid()
  467. if is_valid:
  468. # 记录update_ts
  469. self.proxy_item_update_ts_dict[
  470. proxy_item.proxy_id
  471. ] = proxy_item.update_ts
  472. # 塞回去
  473. proxies = proxy_item.get_proxies()
  474. self.put_proxy_item(proxy_item)
  475. if is_valid == 1:
  476. if proxy_item.use_interval:
  477. proxy_item.use_ts = time.time()
  478. return proxies
  479. else:
  480. # 处理失效代理
  481. self.proxy_dict.pop(proxy_item.proxy_id, "")
  482. self.invalid_proxy_dict[
  483. proxy_item.proxy_id
  484. ] = datetime.datetime.now()
  485. else:
  486. try:
  487. time.sleep(3)
  488. self.reset_proxy_pool()
  489. except Exception as e:
  490. logger.exception(e)
  491. if self.no_valid_proxy_times >= 5:
  492. # 解决bug: 当爬虫仅剩一个任务时 由于只有一个线程检测代理 而不可用代理又刚好很多(时间越长越多) 可能出现一直获取不到代理的情况
  493. # 导致爬虫烂尾
  494. try:
  495. time.sleep(3)
  496. self.reset_proxy_pool()
  497. except Exception as e:
  498. logger.exception(e)
  499. return self.get(retry)
  500. get_proxy = get
  501. def get_random_proxy(self) -> ProxyItem:
  502. """
  503. 随机获取代理
  504. :return:
  505. """
  506. if self.proxy_queue is not None:
  507. if random.random() < 0.5:
  508. # 一半概率检查 这是个高频操作 优化一下
  509. if time.time() - self.last_reset_time > self.reset_interval_max:
  510. time.sleep(3)
  511. self.reset_proxy_pool(force=True)
  512. else:
  513. min_q_size = (
  514. min(self.max_queue_size / 2, self.real_max_proxy_count / 2)
  515. if self.max_queue_size > 0
  516. else self.real_max_proxy_count / 2
  517. )
  518. if self.proxy_queue.qsize() < min_q_size:
  519. time.sleep(3)
  520. self.reset_proxy_pool()
  521. try:
  522. return self.proxy_queue.get_nowait()
  523. except Exception:
  524. pass
  525. return None
  526. def append_proxies(self, proxies_list: list) -> int:
  527. """
  528. 添加代理到代理池
  529. :param proxies_list:
  530. :return:
  531. """
  532. count = 0
  533. if not isinstance(proxies_list, list):
  534. proxies_list = [proxies_list]
  535. for proxies in proxies_list:
  536. if proxies:
  537. proxy_item = ProxyItem(proxies=proxies, **self.kwargs)
  538. # 增加失效判断 2018/12/18
  539. if proxy_item.proxy_id in self.invalid_proxy_dict:
  540. continue
  541. if proxy_item.proxy_id not in self.proxy_dict:
  542. # 补充update_ts
  543. if not proxy_item.update_ts:
  544. proxy_item.update_ts = self.proxy_item_update_ts_dict.get(
  545. proxy_item.proxy_id, 0
  546. )
  547. self.put_proxy_item(proxy_item)
  548. self.proxy_dict[proxy_item.proxy_id] = proxy_item
  549. count += 1
  550. return count
  551. def put_proxy_item(self, proxy_item: ProxyItem):
  552. """
  553. 添加 ProxyItem 到代理池
  554. :param proxy_item:
  555. :return:
  556. """
  557. return self.proxy_queue.put_nowait(proxy_item)
  558. def reset_proxy_pool(self, force: bool = False):
  559. """
  560. 重置代理池
  561. :param force: 是否强制重置代理池
  562. :return:
  563. """
  564. if not self.reset_lock:
  565. # 必须用时调用 否则 可能存在 gevent patch前 threading就已经被导入 导致的Rlock patch失效
  566. import threading
  567. self.reset_lock = threading.RLock()
  568. with self.reset_lock:
  569. if (
  570. force
  571. or self.proxy_queue is None
  572. or (
  573. self.max_queue_size > 0
  574. and self.proxy_queue.qsize() < self.max_queue_size / 2
  575. )
  576. or (
  577. self.max_queue_size < 0
  578. and self.proxy_queue.qsize() < self.real_max_proxy_count / 2
  579. )
  580. or self.no_valid_proxy_times >= 5
  581. ):
  582. if time.time() - self.last_reset_time < self.reset_interval:
  583. self.reset_fast_count += 1
  584. if self.reset_fast_count % 10 == 0:
  585. logger.debug(
  586. "代理池重置的太快了:) {}".format(self.reset_fast_count)
  587. )
  588. time.sleep(1)
  589. else:
  590. self.clear()
  591. if self.proxy_queue is None:
  592. import queue
  593. self.proxy_queue = queue.Queue()
  594. # TODO 这里获取到的可能重复
  595. proxies_list = get_proxy_from_url(**self.kwargs)
  596. self.real_max_proxy_count = len(proxies_list)
  597. if 0 < self.max_queue_size < self.real_max_proxy_count:
  598. proxies_list = random.sample(proxies_list, self.max_queue_size)
  599. _valid_count = self.append_proxies(proxies_list)
  600. self.last_reset_time = time.time()
  601. self.no_valid_proxy_times = 0
  602. logger.debug(
  603. "重置代理池成功: 获取{}, 成功添加{}, 失效{}, 当前代理数{},".format(
  604. len(proxies_list),
  605. _valid_count,
  606. len(self.invalid_proxy_dict),
  607. len(self.proxy_dict),
  608. )
  609. )
  610. return
  611. def tag_proxy(self, proxies_list: list, flag: int, *, delay=30) -> bool:
  612. """
  613. 对代理进行标记
  614. :param proxies_list:
  615. :param flag:
  616. -1 废弃
  617. 1 延迟使用
  618. :param delay: 延迟时间
  619. :return:
  620. """
  621. if int(flag) not in ProxyItem.proxy_tag_list or not proxies_list:
  622. return False
  623. if not isinstance(proxies_list, list):
  624. proxies_list = [proxies_list]
  625. for proxies in proxies_list:
  626. if not proxies:
  627. continue
  628. proxy_id = ProxyItem(proxies).proxy_id
  629. if proxy_id not in self.proxy_dict:
  630. continue
  631. self.proxy_dict[proxy_id].flag = flag
  632. self.proxy_dict[proxy_id].flag_ts = time.time()
  633. self.proxy_dict[proxy_id].delay = delay
  634. return True
  635. def get_proxy_item(self, proxy_id="", proxies=None):
  636. """
  637. 获取代理对象
  638. :param proxy_id:
  639. :param proxies:
  640. :return:
  641. """
  642. if proxy_id:
  643. return self.proxy_dict.get(proxy_id)
  644. if proxies:
  645. proxy_id = ProxyItem(proxies).proxy_id
  646. return self.proxy_dict.get(proxy_id)
  647. return
  648. def copy(self):
  649. return ProxyPool(**self.kwargs)
  650. def all(self) -> list:
  651. """
  652. 获取当前代理池中的全部代理
  653. :return:
  654. """
  655. return get_proxy_from_url(**self.kwargs)