drissionpage_driver.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2025-05-14
  4. ---------
  5. @summary:
  6. ---------
  7. @author: Dzr
  8. """
  9. from time import perf_counter
  10. from DrissionPage import Chromium, ChromiumOptions
  11. from DrissionPage.common import Settings
  12. from feapder.utils import tools
  13. from feapder.utils.log import log
  14. from feapder.utils.webdriver.webdirver import *
  15. class SingletonMeta(type):
  16. """单例元类"""
  17. _instances = {}
  18. def __call__(cls, *args, **kwargs):
  19. if cls not in cls._instances:
  20. cls._instances[cls] = super().__call__(*args, **kwargs)
  21. return cls._instances[cls]
  22. def clear_instance(cls):
  23. """清除元类中保存的实例引用"""
  24. if cls in cls._instances:
  25. del cls._instances[cls]
  26. class Browser(metaclass=SingletonMeta):
  27. _browser: Chromium = None
  28. def __init__(
  29. self,
  30. load_images=True,
  31. user_agent=None,
  32. port=None,
  33. user_data_path=None,
  34. proxy=None,
  35. headless=False,
  36. singleton_tab=True,
  37. driver_type="Chromium",
  38. timeout=30,
  39. custom_argument=None,
  40. download_path=None,
  41. browser_path=None,
  42. **kwargs
  43. ):
  44. """
  45. webdriver 封装,仅支持Chromium
  46. Args:
  47. load_images: 是否加载图片
  48. port: 浏览器端口
  49. user_data_path: 用户数据目录
  50. scope: 自动端口范围,与port 同时只能生效一个
  51. user_agent: 字符串 或 无参函数,返回值为user_agent
  52. proxy: xxx.xxx.xxx.xxx:xxxx 或 无参函数,返回值为代理地址
  53. headless: 是否启用无头模式
  54. driver_type: Chromium
  55. singleton_tab: 标签页是否开启多例支持,True=单例 False=多例
  56. timeout: 请求超时时间
  57. custom_argument: 自定义参数,浏览器启动配置参数
  58. download_path: 文件下载保存路径;
  59. browser_path: 浏览器可执行文件路径;
  60. **kwargs:
  61. """
  62. # 如果实例已存在,则不再重新初始化
  63. if self._browser is not None:
  64. return
  65. self._singleton_tab = singleton_tab
  66. self._driver_type = driver_type
  67. self._headless = headless
  68. self._user_agent = user_agent or setting.DEFAULT_USERAGENT
  69. self._proxy = proxy
  70. self._timeout = timeout
  71. self._load_images = load_images
  72. self._download_path = download_path
  73. self._browser_path = browser_path
  74. self._custom_argument = custom_argument
  75. self._kwargs = kwargs
  76. Settings.set_language("zh_cn") # DrissionPage 的报错信息及提示设置为中文
  77. Settings.set_singleton_tab_obj(self._singleton_tab)
  78. co = ChromiumOptions()
  79. if self._browser_path is not None:
  80. co.set_browser_path(self._browser_path)
  81. port = port or setting.DRISSIONPAGE.get("port")
  82. user_data_path = user_data_path or setting.DRISSIONPAGE.get("user_data_path")
  83. if port is not None:
  84. co.set_local_port(int(port))
  85. if user_data_path is not None:
  86. co.set_user_data_path(user_data_path)
  87. else:
  88. # 设置自动端口范围
  89. co.auto_port(scope=setting.DRISSIONPAGE.get("scope"))
  90. # 设置默认超时时间,用于元素等待、alert 等待、WebPage的 s 模式连接等等
  91. if self._timeout is not None:
  92. co.set_timeouts(base=self._timeout)
  93. # 设置是否以无界面模式启动浏览器
  94. co.headless(on_off=self._headless)
  95. # 设置初始窗口大小
  96. window_size = setting.DRISSIONPAGE.get("window_size")
  97. if window_size is not None:
  98. window_size = ",".join((str(n) for n in window_size))
  99. co.set_argument("--window-size", window_size)
  100. # 设置 useragent
  101. co.set_user_agent(self._user_agent)
  102. # 设置浏览器代理
  103. if self._proxy is not None:
  104. co.set_argument("--proxy-server", value=self._proxy)
  105. # 设置是否禁止加载图片
  106. co.no_imgs(on_off=not self._load_images)
  107. # 设置下载路径
  108. if self._download_path is not None:
  109. co.set_download_path(self._download_path)
  110. # 添加自定义的配置参数
  111. if self._custom_argument:
  112. for arg in self._custom_argument:
  113. co.set_argument(arg)
  114. self._browser = Chromium(addr_or_opts=co)
  115. @property
  116. def is_running(self):
  117. return self._browser.states.is_alive if self._browser is not None else False
  118. def new_tab(self):
  119. if self.is_running:
  120. return self._browser.new_tab()
  121. def tabs_count(self):
  122. if self.is_running:
  123. return self._browser.tabs_count
  124. else:
  125. return 0
  126. def get_browser(self):
  127. return self._browser
  128. def quit(self):
  129. if self._browser is not None:
  130. self._browser.quit(del_data=True)
  131. self._browser = None
  132. SingletonMeta.clear_instance(self.__class__) # 释放资源
  133. def __enter__(self):
  134. return self
  135. def __exit__(self, exc_type, exc_val, exc_tb):
  136. if exc_val:
  137. log.error(exc_val)
  138. self.quit()
  139. return True
  140. class DrissionPageDriver(WebDriver):
  141. def __init__(self, **kwargs):
  142. super(DrissionPageDriver, self).__init__(**kwargs)
  143. # 创建全局浏览器实例(单例)
  144. self._browser = Browser(**kwargs)
  145. # 创建新标签页
  146. tab = self._browser.new_tab()
  147. # 设置自动确认弹窗
  148. tab.set.auto_handle_alert()
  149. # 设置网页加载策略
  150. tab.set.load_mode(setting.DRISSIONPAGE.get("load_mode", "normal"))
  151. # 设置浏览器标识
  152. ua = kwargs.get("user_agent")
  153. if ua is not None:
  154. tab.set.user_agent(ua)
  155. self.url = None
  156. self.tab = tab
  157. @property
  158. def browser(self):
  159. return self._browser.get_browser()
  160. def get_tab(self):
  161. """获取当前标签页,启用多例-可支持多个实例控制同一个标签页"""
  162. return self.browser.get_tab(id_or_num=self.tab.tab_id)
  163. def get_dom_hash(self):
  164. """获取当前DOM的哈希值"""
  165. return tools.get_md5(self.tab.html)
  166. def wait_for_dom_stable(self, duration=None):
  167. """
  168. 计算指定时间内页面的DOM变化次数,可忽略特定元素
  169. :param duration: 监听时长(秒)
  170. :return: DOM变化次数
  171. """
  172. duration = self.tab.timeouts.base if duration is None else duration
  173. script = f'''
  174. return new Promise((resolve) => {{
  175. let mutationCount = 0;
  176. // 创建MutationObserver监听DOM变化
  177. const observer = new MutationObserver((mutations) => {{
  178. if (mutations.length > 0) {{
  179. mutationCount += mutations.length;
  180. }}
  181. }});
  182. // 开始监听所有DOM变化
  183. observer.observe(document, {{
  184. childList: true,
  185. attributes: true,
  186. subtree: true,
  187. characterData: true
  188. }});
  189. // 指定时间后停止监听并返回变化次数
  190. setTimeout(() => {{
  191. observer.disconnect();
  192. resolve(mutationCount);
  193. }}, {duration * 1000});
  194. }});
  195. '''
  196. count = self.tab.run_js(script)
  197. return True if count == 0 else False
  198. def reload(self):
  199. # 尝试触发重绘(通过重绘修复图片加载完成后可能导致布局错乱问题)
  200. self.tab.run_js('document.body.style.display="none";document.body.style.display="block";')
  201. def wait_for_dom_load(self, timeout=None, raise_err=False):
  202. """等待页面加载"""
  203. assert "ERR_CONNECTION_CLOSED" not in self.tab.raw_data
  204. assert self.tab.url_available is True
  205. timeout = self.tab.timeouts.page_load if timeout is None else timeout
  206. end_time = perf_counter() + timeout
  207. while perf_counter() < end_time:
  208. init_hash = self.get_dom_hash()
  209. render_time = (end_time - perf_counter()) / 10
  210. timeout = render_time if render_time > 0 else 1
  211. self.reload()
  212. self.tab.wait(timeout)
  213. current_hash = self.get_dom_hash()
  214. if current_hash == init_hash:
  215. return True
  216. if raise_err is True:
  217. raise TimeoutError("等待页面加载超时")
  218. return False
  219. @property
  220. def domain(self):
  221. return tools.get_domain(self.url or self.tab.url)
  222. def quit(self):
  223. if self.tab:
  224. self.tab.close()
  225. if self._browser.is_running and self._browser.tabs_count() <= 1:
  226. self._browser.quit()
  227. def __enter__(self):
  228. return self
  229. def __exit__(self, exc_type, exc_val, exc_tb):
  230. if exc_val:
  231. log.error(exc_val)
  232. self.quit()
  233. return True