# -*- coding: utf-8 -*- """ Created on 2025-05-14 --------- @summary: --------- @author: Dzr """ from time import perf_counter from DrissionPage import Chromium, ChromiumOptions from DrissionPage.common import Settings from feapder.utils import tools from feapder.utils.log import log from feapder.utils.webdriver.webdirver import * class SingletonMeta(type): """单例元类""" _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super().__call__(*args, **kwargs) return cls._instances[cls] def clear_instance(cls): """清除元类中保存的实例引用""" if cls in cls._instances: del cls._instances[cls] class Browser(metaclass=SingletonMeta): _browser: Chromium = None def __init__( self, load_images=True, user_agent=None, port=None, user_data_path=None, proxy=None, headless=False, singleton_tab=True, driver_type="Chromium", timeout=30, custom_argument=None, download_path=None, browser_path=None, **kwargs ): """ webdriver 封装,仅支持Chromium Args: load_images: 是否加载图片 port: 浏览器端口 user_data_path: 用户数据目录 scope: 自动端口范围,与port 同时只能生效一个 user_agent: 字符串 或 无参函数,返回值为user_agent proxy: xxx.xxx.xxx.xxx:xxxx 或 无参函数,返回值为代理地址 headless: 是否启用无头模式 driver_type: Chromium singleton_tab: 标签页是否开启多例支持,True=单例 False=多例 timeout: 请求超时时间 custom_argument: 自定义参数,浏览器启动配置参数 download_path: 文件下载保存路径; browser_path: 浏览器可执行文件路径; **kwargs: """ # 如果实例已存在,则不再重新初始化 if self._browser is not None: return self._singleton_tab = singleton_tab self._driver_type = driver_type self._headless = headless self._user_agent = user_agent or setting.DEFAULT_USERAGENT self._proxy = proxy self._timeout = timeout self._load_images = load_images self._download_path = download_path self._browser_path = browser_path self._custom_argument = custom_argument self._kwargs = kwargs Settings.set_language("zh_cn") # DrissionPage 的报错信息及提示设置为中文 Settings.set_singleton_tab_obj(self._singleton_tab) co = ChromiumOptions() if self._browser_path is not None: co.set_browser_path(self._browser_path) port = port or setting.DRISSIONPAGE.get("port") user_data_path = user_data_path or setting.DRISSIONPAGE.get("user_data_path") if port is not None: co.set_local_port(int(port)) if user_data_path is not None: co.set_user_data_path(user_data_path) else: # 设置自动端口范围 co.auto_port(scope=setting.DRISSIONPAGE.get("scope")) # 设置默认超时时间,用于元素等待、alert 等待、WebPage的 s 模式连接等等 if self._timeout is not None: co.set_timeouts(base=self._timeout) # 设置是否以无界面模式启动浏览器 co.headless(on_off=self._headless) # 设置初始窗口大小 window_size = setting.DRISSIONPAGE.get("window_size") if window_size is not None: window_size = ",".join((str(n) for n in window_size)) co.set_argument("--window-size", window_size) # 设置 useragent co.set_user_agent(self._user_agent) # 设置浏览器代理 if self._proxy is not None: co.set_argument("--proxy-server", value=self._proxy) # 设置是否禁止加载图片 co.no_imgs(on_off=not self._load_images) # 设置下载路径 if self._download_path is not None: co.set_download_path(self._download_path) # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: co.set_argument(arg) self._browser = Chromium(addr_or_opts=co) @property def is_running(self): return self._browser.states.is_alive if self._browser is not None else False def new_tab(self): if self.is_running: return self._browser.new_tab() def tabs_count(self): if self.is_running: return self._browser.tabs_count else: return 0 def get_browser(self): return self._browser def quit(self): if self._browser is not None: self._browser.quit(del_data=True) self._browser = None SingletonMeta.clear_instance(self.__class__) # 释放资源 def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val: log.error(exc_val) self.quit() return True class DrissionPageDriver(WebDriver): def __init__(self, **kwargs): super(DrissionPageDriver, self).__init__(**kwargs) # 创建全局浏览器实例(单例) self._browser = Browser(**kwargs) # 创建新标签页 tab = self._browser.new_tab() # 设置自动确认弹窗 tab.set.auto_handle_alert() # 设置网页加载策略 tab.set.load_mode(setting.DRISSIONPAGE.get("load_mode", "normal")) # 设置浏览器标识 ua = kwargs.get("user_agent") if ua is not None: tab.set.user_agent(ua) self.url = None self.tab = tab @property def browser(self): return self._browser.get_browser() def get_tab(self): """获取当前标签页,启用多例-可支持多个实例控制同一个标签页""" return self.browser.get_tab(id_or_num=self.tab.tab_id) def get_dom_hash(self): """获取当前DOM的哈希值""" return tools.get_md5(self.tab.html) def wait_for_dom_stable(self, duration=None): """ 计算指定时间内页面的DOM变化次数,可忽略特定元素 :param duration: 监听时长(秒) :return: DOM变化次数 """ duration = self.tab.timeouts.base if duration is None else duration script = f''' return new Promise((resolve) => {{ let mutationCount = 0; // 创建MutationObserver监听DOM变化 const observer = new MutationObserver((mutations) => {{ if (mutations.length > 0) {{ mutationCount += mutations.length; }} }}); // 开始监听所有DOM变化 observer.observe(document, {{ childList: true, attributes: true, subtree: true, characterData: true }}); // 指定时间后停止监听并返回变化次数 setTimeout(() => {{ observer.disconnect(); resolve(mutationCount); }}, {duration * 1000}); }}); ''' count = self.tab.run_js(script) return True if count == 0 else False def reload(self): # 尝试触发重绘(通过重绘修复图片加载完成后可能导致布局错乱问题) self.tab.run_js('document.body.style.display="none";document.body.style.display="block";') def wait_for_dom_load(self, timeout=None, raise_err=False): """等待页面加载""" assert "ERR_CONNECTION_CLOSED" not in self.tab.raw_data assert self.tab.url_available is True timeout = self.tab.timeouts.page_load if timeout is None else timeout end_time = perf_counter() + timeout while perf_counter() < end_time: init_hash = self.get_dom_hash() render_time = (end_time - perf_counter()) / 10 timeout = render_time if render_time > 0 else 1 self.reload() self.tab.wait(timeout) current_hash = self.get_dom_hash() if current_hash == init_hash: return True if raise_err is True: raise TimeoutError("等待页面加载超时") return False @property def domain(self): return tools.get_domain(self.url or self.tab.url) def quit(self): if self.tab: self.tab.close() if self._browser.is_running and self._browser.tabs_count() <= 1: self._browser.quit() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val: log.error(exc_val) self.quit() return True