playwright_driver.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2022/9/7 4:11 PM
  4. ---------
  5. @summary:
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import json
  11. import os
  12. import re
  13. from collections import defaultdict
  14. from typing import Union, List
  15. try:
  16. from typing import Literal # python >= 3.8
  17. except ImportError: # python <3.8
  18. from typing_extensions import Literal
  19. from playwright.sync_api import Page, BrowserContext, ViewportSize, ProxySettings
  20. from playwright.sync_api import Playwright, Browser
  21. from playwright.sync_api import Response
  22. from playwright.sync_api import sync_playwright
  23. from feapder.utils import tools
  24. from feapder.utils.log import log
  25. from feapder.utils.webdriver.webdirver import *
  26. class PlaywrightDriver(WebDriver):
  27. def __init__(
  28. self,
  29. *,
  30. page_on_event_callback: dict = None,
  31. storage_state_path: str = None,
  32. driver_type: Literal["chromium", "firefox", "webkit"] = "chromium",
  33. url_regexes: list = None,
  34. save_all: bool = False,
  35. **kwargs
  36. ):
  37. """
  38. Args:
  39. page_on_event_callback: page.on() 事件的回调 如 page_on_event_callback={"dialog": lambda dialog: dialog.accept()}
  40. storage_state_path: 保存浏览器状态的路径
  41. driver_type: 浏览器类型 chromium, firefox, webkit
  42. url_regexes: 拦截接口,支持正则,数组类型
  43. save_all: 是否保存所有拦截的接口, 默认只保存最后一个
  44. **kwargs:
  45. """
  46. super(PlaywrightDriver, self).__init__(**kwargs)
  47. self.driver: Playwright = None
  48. self.browser: Browser = None
  49. self.context: BrowserContext = None
  50. self.page: Page = None
  51. self.url = None
  52. self.storage_state_path = storage_state_path
  53. self._driver_type = driver_type or "chromium"
  54. self._page_on_event_callback = page_on_event_callback
  55. self._url_regexes = url_regexes
  56. self._save_all = save_all
  57. if self._save_all and self._url_regexes:
  58. log.warning(
  59. "获取完拦截的数据后, 请主动调用PlaywrightDriver的clear_cache()方法清空拦截的数据,否则数据会一直累加,导致内存溢出"
  60. )
  61. self._cache_data = defaultdict(list)
  62. else:
  63. self._cache_data = {}
  64. self._setup()
  65. def _setup(self):
  66. # 处理参数
  67. if self._proxy:
  68. proxy = self._proxy() if callable(self._proxy) else self._proxy
  69. proxy = self.format_context_proxy(proxy)
  70. else:
  71. proxy = None
  72. user_agent = (
  73. self._user_agent() if callable(self._user_agent) else self._user_agent
  74. )
  75. view_size = ViewportSize(
  76. width=self._window_size[0], height=self._window_size[1]
  77. )
  78. # 初始化浏览器对象
  79. self.driver = sync_playwright().start()
  80. self.browser = getattr(self.driver, self._driver_type).launch(
  81. headless=self._headless,
  82. args=["--no-sandbox"],
  83. proxy=proxy,
  84. executable_path=self._executable_path,
  85. downloads_path=self._download_path,
  86. )
  87. if self.storage_state_path and os.path.exists(self.storage_state_path):
  88. self.context = self.browser.new_context(
  89. user_agent=user_agent,
  90. screen=view_size,
  91. viewport=view_size,
  92. proxy=proxy,
  93. storage_state=self.storage_state_path,
  94. )
  95. else:
  96. self.context = self.browser.new_context(
  97. user_agent=user_agent,
  98. screen=view_size,
  99. viewport=view_size,
  100. proxy=proxy,
  101. )
  102. if self._use_stealth_js:
  103. path = os.path.join(os.path.dirname(__file__), "../js/stealth.min.js")
  104. self.context.add_init_script(path=path)
  105. self.page = self.context.new_page()
  106. self.page.set_default_timeout(self._timeout * 1000)
  107. if self._page_on_event_callback:
  108. for event, callback in self._page_on_event_callback.items():
  109. self.page.on(event, callback)
  110. if self._url_regexes:
  111. self.page.on("response", self.on_response)
  112. def __enter__(self):
  113. return self
  114. def __exit__(self, exc_type, exc_val, exc_tb):
  115. if exc_val:
  116. log.error(exc_val)
  117. self.quit()
  118. return True
  119. def format_context_proxy(self, proxy) -> ProxySettings:
  120. """
  121. Args:
  122. proxy: username:password@ip:port / ip:port
  123. Returns:
  124. {
  125. "server": "ip:port"
  126. "username": username,
  127. "password": password,
  128. }
  129. server: http://ip:port or socks5://ip:port. Short form ip:port is considered an HTTP proxy.
  130. """
  131. if "@" in proxy:
  132. certification, _proxy = proxy.split("@")
  133. username, password = certification.split(":")
  134. context_proxy = ProxySettings(
  135. server=_proxy,
  136. username=username,
  137. password=password,
  138. )
  139. else:
  140. context_proxy = ProxySettings(server=proxy)
  141. return context_proxy
  142. def save_storage_stage(self):
  143. if self.storage_state_path:
  144. os.makedirs(os.path.dirname(self.storage_state_path), exist_ok=True)
  145. self.context.storage_state(path=self.storage_state_path)
  146. def quit(self):
  147. self.page.close()
  148. self.context.close()
  149. self.browser.close()
  150. self.driver.stop()
  151. @property
  152. def domain(self):
  153. return tools.get_domain(self.url or self.page.url)
  154. @property
  155. def cookies(self):
  156. cookies_json = {}
  157. for cookie in self.page.context.cookies():
  158. cookies_json[cookie["name"]] = cookie["value"]
  159. return cookies_json
  160. @cookies.setter
  161. def cookies(self, val: Union[dict, List[dict]]):
  162. """
  163. 设置cookie
  164. Args:
  165. val: List[{name: str, value: str, url: Union[str, NoneType], domain: Union[str, NoneType], path: Union[str, NoneType], expires: Union[float, NoneType], httpOnly: Union[bool, NoneType], secure: Union[bool, NoneType], sameSite: Union["Lax", "None", "Strict", NoneType]}]
  166. Returns:
  167. """
  168. if isinstance(val, list):
  169. self.page.context.add_cookies(val)
  170. else:
  171. cookies = []
  172. for key, value in val.items():
  173. cookies.append(
  174. {"name": key, "value": value, "url": self.url or self.page.url}
  175. )
  176. self.page.context.add_cookies(cookies)
  177. @property
  178. def user_agent(self):
  179. return self.page.evaluate("() => navigator.userAgent")
  180. def on_response(self, response: Response):
  181. for regex in self._url_regexes:
  182. if re.search(regex, response.request.url):
  183. intercept_request = InterceptRequest(
  184. url=response.request.url,
  185. headers=response.request.headers,
  186. data=response.request.post_data,
  187. )
  188. intercept_response = InterceptResponse(
  189. request=intercept_request,
  190. url=response.url,
  191. headers=response.headers,
  192. content=response.body(),
  193. status_code=response.status,
  194. )
  195. if self._save_all:
  196. self._cache_data[regex].append(intercept_response)
  197. else:
  198. self._cache_data[regex] = intercept_response
  199. def get_response(self, url_regex) -> InterceptResponse:
  200. if self._save_all:
  201. response_list = self._cache_data.get(url_regex)
  202. if response_list:
  203. return response_list[-1]
  204. return self._cache_data.get(url_regex)
  205. def get_all_response(self, url_regex) -> List[InterceptResponse]:
  206. """
  207. 获取所有匹配的响应, 仅在save_all=True时有效
  208. Args:
  209. url_regex:
  210. Returns:
  211. """
  212. response_list = self._cache_data.get(url_regex, [])
  213. if not isinstance(response_list, list):
  214. return [response_list]
  215. return response_list
  216. def get_text(self, url_regex):
  217. return (
  218. self.get_response(url_regex).content.decode()
  219. if self.get_response(url_regex)
  220. else None
  221. )
  222. def get_all_text(self, url_regex):
  223. """
  224. 获取所有匹配的响应文本, 仅在save_all=True时有效
  225. Args:
  226. url_regex:
  227. Returns:
  228. """
  229. return [
  230. response.content.decode() for response in self.get_all_response(url_regex)
  231. ]
  232. def get_json(self, url_regex):
  233. return (
  234. json.loads(self.get_text(url_regex))
  235. if self.get_response(url_regex)
  236. else None
  237. )
  238. def get_all_json(self, url_regex):
  239. """
  240. 获取所有匹配的响应json, 仅在save_all=True时有效
  241. Args:
  242. url_regex:
  243. Returns:
  244. """
  245. return [json.loads(text) for text in self.get_all_text(url_regex)]
  246. def clear_cache(self):
  247. self._cache_data = defaultdict(list)