123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- # -*- coding: utf-8 -*-
- """
- Created on 2025-05-14
- ---------
- @summary:
- ---------
- @author: Dzr
- """
- from time import perf_counter
- from DrissionPage import Chromium, ChromiumOptions
- from DrissionPage.common import Settings
- from feapder.utils import tools
- from feapder.utils.log import log
- from feapder.utils.webdriver.webdirver import *
- class SingletonMeta(type):
- """单例元类"""
- _instances = {}
- def __call__(cls, *args, **kwargs):
- if cls not in cls._instances:
- cls._instances[cls] = super().__call__(*args, **kwargs)
- return cls._instances[cls]
- def clear_instance(cls):
- """清除元类中保存的实例引用"""
- if cls in cls._instances:
- del cls._instances[cls]
- class Browser(metaclass=SingletonMeta):
- _browser: Chromium = None
- def __init__(
- self,
- load_images=True,
- user_agent=None,
- port=None,
- user_data_path=None,
- proxy=None,
- headless=False,
- singleton_tab=True,
- driver_type="Chromium",
- timeout=30,
- custom_argument=None,
- download_path=None,
- browser_path=None,
- **kwargs
- ):
- """
- webdriver 封装,仅支持Chromium
- Args:
- load_images: 是否加载图片
- port: 浏览器端口
- user_data_path: 用户数据目录
- scope: 自动端口范围,与port 同时只能生效一个
- user_agent: 字符串 或 无参函数,返回值为user_agent
- proxy: xxx.xxx.xxx.xxx:xxxx 或 无参函数,返回值为代理地址
- headless: 是否启用无头模式
- driver_type: Chromium
- singleton_tab: 标签页是否开启多例支持,True=单例 False=多例
- timeout: 请求超时时间
- custom_argument: 自定义参数,浏览器启动配置参数
- download_path: 文件下载保存路径;
- browser_path: 浏览器可执行文件路径;
- **kwargs:
- """
- # 如果实例已存在,则不再重新初始化
- if self._browser is not None:
- return
- self._singleton_tab = singleton_tab
- self._driver_type = driver_type
- self._headless = headless
- self._user_agent = user_agent or setting.DEFAULT_USERAGENT
- self._proxy = proxy
- self._timeout = timeout
- self._load_images = load_images
- self._download_path = download_path
- self._browser_path = browser_path
- self._custom_argument = custom_argument
- self._kwargs = kwargs
- Settings.set_language("zh_cn") # DrissionPage 的报错信息及提示设置为中文
- Settings.set_singleton_tab_obj(self._singleton_tab)
- co = ChromiumOptions()
- if self._browser_path is not None:
- co.set_browser_path(self._browser_path)
- port = port or setting.DRISSIONPAGE.get("port")
- user_data_path = user_data_path or setting.DRISSIONPAGE.get("user_data_path")
- if port is not None:
- co.set_local_port(int(port))
- if user_data_path is not None:
- co.set_user_data_path(user_data_path)
- else:
- # 设置自动端口范围
- co.auto_port(scope=setting.DRISSIONPAGE.get("scope"))
- # 设置默认超时时间,用于元素等待、alert 等待、WebPage的 s 模式连接等等
- if self._timeout is not None:
- co.set_timeouts(base=self._timeout)
- # 设置是否以无界面模式启动浏览器
- co.headless(on_off=self._headless)
- # 设置初始窗口大小
- window_size = setting.DRISSIONPAGE.get("window_size")
- if window_size is not None:
- window_size = ",".join((str(n) for n in window_size))
- co.set_argument("--window-size", window_size)
- # 设置 useragent
- co.set_user_agent(self._user_agent)
- # 设置浏览器代理
- if self._proxy is not None:
- co.set_argument("--proxy-server", value=self._proxy)
- # 设置是否禁止加载图片
- co.no_imgs(on_off=not self._load_images)
- # 设置下载路径
- if self._download_path is not None:
- co.set_download_path(self._download_path)
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- co.set_argument(arg)
- self._browser = Chromium(addr_or_opts=co)
- @property
- def is_running(self):
- return self._browser.states.is_alive if self._browser is not None else False
- def new_tab(self):
- if self.is_running:
- return self._browser.new_tab()
- def tabs_count(self):
- if self.is_running:
- return self._browser.tabs_count
- else:
- return 0
- def get_browser(self):
- return self._browser
- def quit(self):
- if self._browser is not None:
- self._browser.quit(del_data=True)
- self._browser = None
- SingletonMeta.clear_instance(self.__class__) # 释放资源
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_val:
- log.error(exc_val)
- self.quit()
- return True
- class DrissionPageDriver(WebDriver):
- def __init__(self, **kwargs):
- super(DrissionPageDriver, self).__init__(**kwargs)
- # 创建全局浏览器实例(单例)
- self._browser = Browser(**kwargs)
- # 创建新标签页
- tab = self._browser.new_tab()
- # 设置自动确认弹窗
- tab.set.auto_handle_alert()
- # 设置网页加载策略
- tab.set.load_mode(setting.DRISSIONPAGE.get("load_mode", "normal"))
- # 设置浏览器标识
- ua = kwargs.get("user_agent")
- if ua is not None:
- tab.set.user_agent(ua)
- self.url = None
- self.tab = tab
- @property
- def browser(self):
- return self._browser.get_browser()
- def get_tab(self):
- """获取当前标签页,启用多例-可支持多个实例控制同一个标签页"""
- return self.browser.get_tab(id_or_num=self.tab.tab_id)
- def get_dom_hash(self):
- """获取当前DOM的哈希值"""
- return tools.get_md5(self.tab.html)
- def wait_for_dom_stable(self, duration=None):
- """
- 计算指定时间内页面的DOM变化次数,可忽略特定元素
- :param duration: 监听时长(秒)
- :return: DOM变化次数
- """
- duration = self.tab.timeouts.base if duration is None else duration
- script = f'''
- return new Promise((resolve) => {{
- let mutationCount = 0;
- // 创建MutationObserver监听DOM变化
- const observer = new MutationObserver((mutations) => {{
- if (mutations.length > 0) {{
- mutationCount += mutations.length;
- }}
- }});
- // 开始监听所有DOM变化
- observer.observe(document, {{
- childList: true,
- attributes: true,
- subtree: true,
- characterData: true
- }});
- // 指定时间后停止监听并返回变化次数
- setTimeout(() => {{
- observer.disconnect();
- resolve(mutationCount);
- }}, {duration * 1000});
- }});
- '''
- count = self.tab.run_js(script)
- return True if count == 0 else False
- def reload(self):
- # 尝试触发重绘(通过重绘修复图片加载完成后可能导致布局错乱问题)
- self.tab.run_js('document.body.style.display="none";document.body.style.display="block";')
- def wait_for_dom_load(self, timeout=None, raise_err=False):
- """等待页面加载"""
- assert "ERR_CONNECTION_CLOSED" not in self.tab.raw_data
- assert self.tab.url_available is True
- timeout = self.tab.timeouts.page_load if timeout is None else timeout
- end_time = perf_counter() + timeout
- while perf_counter() < end_time:
- init_hash = self.get_dom_hash()
- render_time = (end_time - perf_counter()) / 10
- timeout = render_time if render_time > 0 else 1
- self.reload()
- self.tab.wait(timeout)
- current_hash = self.get_dom_hash()
- if current_hash == init_hash:
- return True
- if raise_err is True:
- raise TimeoutError("等待页面加载超时")
- return False
- @property
- def domain(self):
- return tools.get_domain(self.url or self.tab.url)
- def quit(self):
- if self.tab:
- self.tab.close()
- if self._browser.is_running and self._browser.tabs_count() <= 1:
- self._browser.quit()
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_val:
- log.error(exc_val)
- self.quit()
- return True
|