# -*- coding: utf-8 -*- """ Created on 2025-05-26 --------- @summary: 浏览器 Cookie 池 --------- @author: Dzr """ from feapder.network import user_agent as user_agent_pool from feapder.network.proxy_pool import SpringBoardProxyPool from feapder.utils.log import log from feapder.utils.webdriver import DrissionPageDriver from untils.cookie_pool import PageCookiePool DRISSIONPAGE = dict( singleton_tab=True, # 一个标签页是否支持多例操作,True=单例;False=多例 headless=False, # 是否为无头浏览器 load_images=False, # 是否加载图片 user_agent=None, # 字符串 proxy=None, # xxx.xxx.xxx.xxx:xxxx timeout=15, # 请求超时时间,用于元素等待、alert 等待、WebPage的 s 模式连接等等 retry=1, # 连接失败重试次数 interval=0.5, # 连接失败重试间隔(秒) page_load=30, # 页面加载超时时间(秒) window_size=(1024, 800), # 窗口大小 driver_type="chromium", load_mode="normal", # 网页加载策略, 可选值:"normal", "eager", "none" browser_path=None, # 浏览器可执行文件路径 download_path=None, # 下载文件的路径 custom_argument=[ "--no-sandbox", "--ignore-certificate-errors" ] ) class BrowserCookiePool(PageCookiePool): def __init__(self, redis_key, page_url, cookie_key, **kwargs): proxy_api = kwargs.pop("proxy_api", None) self._retry = kwargs.pop("retry", 3) self._interval = kwargs.pop("interval", 1.5) self._render_time = kwargs.pop("render_time", 3) self._proxies = kwargs.pop("proxies") # 仅支持字符串 self._enable_proxy = kwargs.pop("enable_proxy", False) self._proxy = None if self._proxies is None and self._enable_proxy: self._proxy = SpringBoardProxyPool(proxy_api=proxy_api) DRISSIONPAGE["proxy"] = self._proxy.get_proxy()["http"] else: DRISSIONPAGE["proxy"] = self._proxies DRISSIONPAGE["user_agent"] = user_agent_pool.get("chrome") DRISSIONPAGE["load_images"] = kwargs.pop("load_images", False) super(BrowserCookiePool, self).__init__(redis_key, **kwargs) self.page_url = page_url self.cookie_key = cookie_key def create_cookie(self): nums = 0 with DrissionPageDriver(**DRISSIONPAGE) as driver: while True: try: driver.tab.get(self.page_url, retry=DRISSIONPAGE["retry"], timeout=DRISSIONPAGE["timeout"], interval=DRISSIONPAGE["interval"]) driver.wait_for_dom_load(timeout=self._render_time, raise_err=True) cookies = driver.tab.cookies().as_dict() if self.cookie_key in cookies.keys(): return cookies driver.tab.wait(self._interval) nums += 1 if nums >= self._retry: return except Exception as e: log.error(f"获取cookie失败,{e}") driver.tab.clear_cache() if self._enable_proxy: self._proxy.change_proxy(force=True) WebCookiePool = BrowserCookiePool