selenium_driver.py 18 KB


  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021/3/18 4:59 下午
  4. ---------
  5. @summary:
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import json
  11. import logging
  12. import os
  13. from typing import Optional, Union, List
  14. from selenium import webdriver
  15. from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
  16. from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
  17. from webdriver_manager.chrome import ChromeDriverManager
  18. from webdriver_manager.firefox import GeckoDriverManager
  19. from feapder.utils import tools
  20. from feapder.utils.log import log, OTHERS_LOG_LEVAL
  21. from feapder.utils.webdriver.webdirver import *
  22. # 屏蔽webdriver_manager日志
  23. logging.getLogger("WDM").setLevel(OTHERS_LOG_LEVAL)
  24. class SeleniumDriver(WebDriver, RemoteWebDriver):
  25. CHROME = "CHROME"
  26. EDGE = "EDGE"
  27. PHANTOMJS = "PHANTOMJS"
  28. FIREFOX = "FIREFOX"
  29. __CHROME_ATTRS__ = {
  30. "executable_path",
  31. "port",
  32. "options",
  33. "service_args",
  34. "desired_capabilities",
  35. "service_log_path",
  36. "chrome_options",
  37. "keep_alive",
  38. }
  39. __EDGE_ATTRS__ = __CHROME_ATTRS__
  40. __FIREFOX_ATTRS__ = {
  41. "firefox_profile",
  42. "firefox_binary",
  43. "timeout",
  44. "capabilities",
  45. "proxy",
  46. "executable_path",
  47. "options",
  48. "service_log_path",
  49. "firefox_options",
  50. "service_args",
  51. "desired_capabilities",
  52. "log_path",
  53. "keep_alive",
  54. }
  55. __PHANTOMJS_ATTRS__ = {
  56. "executable_path",
  57. "port",
  58. "desired_capabilities",
  59. "service_args",
  60. "service_log_path",
  61. }
  62. def __init__(self, xhr_url_regexes: list = None, **kwargs):
  63. """
  64. Args:
  65. xhr_url_regexes: 拦截xhr接口,支持正则,数组类型
  66. **kwargs:
  67. """
  68. super(SeleniumDriver, self).__init__(**kwargs)
  69. self._xhr_url_regexes = xhr_url_regexes
  70. self._driver_type = self._driver_type or SeleniumDriver.CHROME
  71. if self._xhr_url_regexes and self._driver_type != SeleniumDriver.CHROME:
  72. raise Exception(
  73. "xhr_url_regexes only support by chrome now! eg: driver_type=SeleniumDriver.CHROME"
  74. )
  75. if self._driver_type == SeleniumDriver.CHROME:
  76. self.driver = self.chrome_driver()
  77. elif self._driver_type == SeleniumDriver.EDGE:
  78. self.driver = self.edge_driver()
  79. elif self._driver_type == SeleniumDriver.PHANTOMJS:
  80. self.driver = self.phantomjs_driver()
  81. elif self._driver_type == SeleniumDriver.FIREFOX:
  82. self.driver = self.firefox_driver()
  83. else:
  84. raise TypeError(
  85. "dirver_type must be one of CHROME or PHANTOMJS or FIREFOX, but received {}".format(
  86. type(self._driver_type)
  87. )
  88. )
  89. # driver.get(url)一直不返回,但也不报错的问题,这时程序会卡住,设置超时选项能解决这个问题。
  90. self.driver.set_page_load_timeout(self._timeout)
  91. # 设置10秒脚本超时时间
  92. self.driver.set_script_timeout(self._timeout)
  93. self.url = None
  94. def __enter__(self):
  95. return self
  96. def __exit__(self, exc_type, exc_val, exc_tb):
  97. if exc_val:
  98. log.error(exc_val)
  99. self.quit()
  100. return True
  101. def filter_kwargs(self, kwargs: dict, driver_attrs: set):
  102. if not kwargs:
  103. return {}
  104. data = {}
  105. for key, value in kwargs.items():
  106. if key in driver_attrs:
  107. data[key] = value
  108. return data
  109. def get_driver(self):
  110. return self.driver
  111. def firefox_driver(self):
  112. if webdriver.__version__ >= "4.0.0":
  113. raise Exception(
  114. f"暂未适配selenium=={webdriver.__version__}版本的firefox API,建议安装selenium==3.141.0版本或使用CHROME浏览器"
  115. )
  116. firefox_profile = webdriver.FirefoxProfile()
  117. firefox_options = webdriver.FirefoxOptions()
  118. firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
  119. try:
  120. from selenium.webdriver.firefox.service import Service
  121. except (ImportError, ModuleNotFoundError):
  122. Service = None
  123. if self._proxy:
  124. proxy = self._proxy() if callable(self._proxy) else self._proxy
  125. firefox_capabilities["marionette"] = True
  126. firefox_capabilities["proxy"] = {
  127. "proxyType": "MANUAL",
  128. "httpProxy": proxy,
  129. "ftpProxy": proxy,
  130. "sslProxy": proxy,
  131. }
  132. if self._user_agent:
  133. firefox_profile.set_preference(
  134. "general.useragent.override",
  135. self._user_agent() if callable(self._user_agent) else self._user_agent,
  136. )
  137. if not self._load_images:
  138. firefox_profile.set_preference("permissions.default.image", 2)
  139. if self._headless:
  140. firefox_options.add_argument("--headless")
  141. firefox_options.add_argument("--disable-gpu")
  142. # 添加自定义的配置参数
  143. if self._custom_argument:
  144. for arg in self._custom_argument:
  145. firefox_options.add_argument(arg)
  146. kwargs = self.filter_kwargs(self._kwargs, self.__FIREFOX_ATTRS__)
  147. if Service is None:
  148. if self._executable_path:
  149. kwargs.update(executable_path=self._executable_path)
  150. elif self._auto_install_driver:
  151. kwargs.update(executable_path=GeckoDriverManager().install())
  152. else:
  153. if self._executable_path:
  154. kwargs.update(service=Service(self._executable_path))
  155. elif self._auto_install_driver:
  156. kwargs.update(service=Service(GeckoDriverManager().install()))
  157. driver = webdriver.Firefox(
  158. capabilities=firefox_capabilities,
  159. options=firefox_options,
  160. firefox_profile=firefox_profile,
  161. **kwargs,
  162. )
  163. if self._window_size:
  164. driver.set_window_size(*self._window_size)
  165. return driver
  166. def chrome_driver(self):
  167. chrome_options = webdriver.ChromeOptions()
  168. # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
  169. chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
  170. chrome_options.add_experimental_option("useAutomationExtension", False)
  171. # docker 里运行需要
  172. chrome_options.add_argument("--no-sandbox")
  173. try:
  174. from selenium.webdriver.chrome.service import Service
  175. except (ImportError, ModuleNotFoundError):
  176. Service = None
  177. if self._proxy:
  178. chrome_options.add_argument(
  179. "--proxy-server={}".format(
  180. self._proxy() if callable(self._proxy) else self._proxy
  181. )
  182. )
  183. if self._user_agent:
  184. chrome_options.add_argument(
  185. "user-agent={}".format(
  186. self._user_agent()
  187. if callable(self._user_agent)
  188. else self._user_agent
  189. )
  190. )
  191. if not self._load_images:
  192. chrome_options.add_experimental_option(
  193. "prefs", {"profile.managed_default_content_settings.images": 2}
  194. )
  195. if self._headless:
  196. chrome_options.add_argument("--headless")
  197. chrome_options.add_argument("--disable-gpu")
  198. if self._window_size:
  199. chrome_options.add_argument(
  200. "--window-size={},{}".format(self._window_size[0], self._window_size[1])
  201. )
  202. if self._download_path:
  203. os.makedirs(self._download_path, exist_ok=True)
  204. prefs = {
  205. "download.prompt_for_download": False,
  206. "download.default_directory": self._download_path,
  207. }
  208. chrome_options.add_experimental_option("prefs", prefs)
  209. # 添加自定义的配置参数
  210. if self._custom_argument:
  211. for arg in self._custom_argument:
  212. chrome_options.add_argument(arg)
  213. kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__)
  214. if Service is None:
  215. if self._executable_path:
  216. kwargs.update(executable_path=self._executable_path)
  217. elif self._auto_install_driver:
  218. kwargs.update(executable_path=ChromeDriverManager().install())
  219. else:
  220. if self._executable_path:
  221. kwargs.update(service=Service(self._executable_path))
  222. elif self._auto_install_driver:
  223. kwargs.update(service=Service(ChromeDriverManager().install()))
  224. driver = webdriver.Chrome(options=chrome_options, **kwargs)
  225. # 隐藏浏览器特征
  226. if self._use_stealth_js:
  227. with open(
  228. os.path.join(os.path.dirname(__file__), "../js/stealth.min.js")
  229. ) as f:
  230. js = f.read()
  231. driver.execute_cdp_cmd(
  232. "Page.addScriptToEvaluateOnNewDocument", {"source": js}
  233. )
  234. if self._xhr_url_regexes:
  235. assert isinstance(self._xhr_url_regexes, list)
  236. with open(
  237. os.path.join(os.path.dirname(__file__), "../js/intercept.js")
  238. ) as f:
  239. js = f.read()
  240. driver.execute_cdp_cmd(
  241. "Page.addScriptToEvaluateOnNewDocument", {"source": js}
  242. )
  243. js = f"window.__urlRegexes = {self._xhr_url_regexes}"
  244. driver.execute_cdp_cmd(
  245. "Page.addScriptToEvaluateOnNewDocument", {"source": js}
  246. )
  247. if self._download_path:
  248. driver.command_executor._commands["send_command"] = (
  249. "POST",
  250. "/session/$sessionId/chromium/send_command",
  251. )
  252. params = {
  253. "cmd": "Page.setDownloadBehavior",
  254. "params": {"behavior": "allow", "downloadPath": self._download_path},
  255. }
  256. driver.execute("send_command", params)
  257. return driver
  258. def edge_driver(self):
  259. edge_options = webdriver.EdgeOptions()
  260. # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
  261. edge_options.add_experimental_option("excludeSwitches", ["enable-automation"])
  262. edge_options.add_experimental_option("useAutomationExtension", False)
  263. # docker 里运行需要
  264. edge_options.add_argument("--no-sandbox")
  265. try:
  266. from selenium.webdriver.edge.service import Service
  267. except (ImportError, ModuleNotFoundError):
  268. Service = None
  269. if self._proxy:
  270. edge_options.add_argument(
  271. "--proxy-server={}".format(
  272. self._proxy() if callable(self._proxy) else self._proxy
  273. )
  274. )
  275. if self._user_agent:
  276. edge_options.add_argument(
  277. "user-agent={}".format(
  278. self._user_agent()
  279. if callable(self._user_agent)
  280. else self._user_agent
  281. )
  282. )
  283. if not self._load_images:
  284. edge_options.add_experimental_option(
  285. "prefs", {"profile.managed_default_content_settings.images": 2}
  286. )
  287. if self._headless:
  288. edge_options.add_argument("--headless")
  289. edge_options.add_argument("--disable-gpu")
  290. if self._window_size:
  291. edge_options.add_argument(
  292. "--window-size={},{}".format(self._window_size[0], self._window_size[1])
  293. )
  294. if self._download_path:
  295. os.makedirs(self._download_path, exist_ok=True)
  296. prefs = {
  297. "download.prompt_for_download": False,
  298. "download.default_directory": self._download_path,
  299. }
  300. edge_options.add_experimental_option("prefs", prefs)
  301. # 添加自定义的配置参数
  302. if self._custom_argument:
  303. for arg in self._custom_argument:
  304. edge_options.add_argument(arg)
  305. kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__)
  306. if Service is None:
  307. if self._executable_path:
  308. kwargs.update(executable_path=self._executable_path)
  309. elif self._auto_install_driver:
  310. raise NotImplementedError("edge not support auto install driver")
  311. else:
  312. if self._executable_path:
  313. kwargs.update(service=Service(self._executable_path))
  314. elif self._auto_install_driver:
  315. raise NotImplementedError("edge not support auto install driver")
  316. driver = webdriver.Edge(options=edge_options, **kwargs)
  317. # 隐藏浏览器特征
  318. if self._use_stealth_js:
  319. with open(
  320. os.path.join(os.path.dirname(__file__), "../js/stealth.min.js")
  321. ) as f:
  322. js = f.read()
  323. driver.execute_cdp_cmd(
  324. "Page.addScriptToEvaluateOnNewDocument", {"source": js}
  325. )
  326. if self._xhr_url_regexes:
  327. assert isinstance(self._xhr_url_regexes, list)
  328. with open(
  329. os.path.join(os.path.dirname(__file__), "../js/intercept.js")
  330. ) as f:
  331. js = f.read()
  332. driver.execute_cdp_cmd(
  333. "Page.addScriptToEvaluateOnNewDocument", {"source": js}
  334. )
  335. js = f"window.__urlRegexes = {self._xhr_url_regexes}"
  336. driver.execute_cdp_cmd(
  337. "Page.addScriptToEvaluateOnNewDocument", {"source": js}
  338. )
  339. if self._download_path:
  340. driver.command_executor._commands["send_command"] = (
  341. "POST",
  342. "/session/$sessionId/chromium/send_command",
  343. )
  344. params = {
  345. "cmd": "Page.setDownloadBehavior",
  346. "params": {"behavior": "allow", "downloadPath": self._download_path},
  347. }
  348. driver.execute("send_command", params)
  349. return driver
  350. def phantomjs_driver(self):
  351. import warnings
  352. warnings.filterwarnings("ignore")
  353. service_args = []
  354. dcap = DesiredCapabilities.PHANTOMJS
  355. if self._proxy:
  356. service_args.append(
  357. "--proxy=%s" % self._proxy() if callable(self._proxy) else self._proxy
  358. )
  359. if self._user_agent:
  360. dcap["phantomjs.page.settings.userAgent"] = (
  361. self._user_agent() if callable(self._user_agent) else self._user_agent
  362. )
  363. if not self._load_images:
  364. service_args.append("--load-images=no")
  365. # 添加自定义的配置参数
  366. if self._custom_argument:
  367. for arg in self._custom_argument:
  368. service_args.append(arg)
  369. kwargs = self.filter_kwargs(self._kwargs, self.__PHANTOMJS_ATTRS__)
  370. if self._executable_path:
  371. kwargs.update(executable_path=self._executable_path)
  372. driver = webdriver.PhantomJS(
  373. service_args=service_args, desired_capabilities=dcap, **kwargs
  374. )
  375. if self._window_size:
  376. driver.set_window_size(self._window_size[0], self._window_size[1])
  377. del warnings
  378. return driver
  379. @property
  380. def domain(self):
  381. return tools.get_domain(self.url or self.driver.current_url)
  382. @property
  383. def cookies(self):
  384. cookies_json = {}
  385. for cookie in self.driver.get_cookies():
  386. cookies_json[cookie["name"]] = cookie["value"]
  387. return cookies_json
  388. @cookies.setter
  389. def cookies(self, val: Union[dict, List[dict]]):
  390. """
  391. 设置cookie
  392. Args:
  393. val: {"key":"value", "key2":"value2"}
  394. Returns:
  395. """
  396. if isinstance(val, list):
  397. for cookie in val:
  398. # "path", "domain", "secure", "expiry"
  399. _cookie = {
  400. "name": cookie.get("name"),
  401. "value": cookie.get("value"),
  402. "domain": cookie.get("domain"),
  403. "path": cookie.get("path"),
  404. "expires": cookie.get("expires"),
  405. "secure": cookie.get("secure"),
  406. }
  407. self.driver.add_cookie(_cookie)
  408. else:
  409. for key, value in val.items():
  410. self.driver.add_cookie({"name": key, "value": value})
  411. @property
  412. def user_agent(self):
  413. return self.driver.execute_script("return navigator.userAgent;")
  414. def xhr_response(self, xhr_url_regex) -> Optional[InterceptResponse]:
  415. data = self.driver.execute_script(
  416. f'return window.__ajaxData["{xhr_url_regex}"];'
  417. )
  418. if not data:
  419. return None
  420. request = InterceptRequest(**data["request"])
  421. response = InterceptResponse(request, **data["response"])
  422. return response
  423. def xhr_data(self, xhr_url_regex) -> Union[str, dict, None]:
  424. response = self.xhr_response(xhr_url_regex)
  425. if not response:
  426. return None
  427. return response.content
  428. def xhr_text(self, xhr_url_regex) -> Optional[str]:
  429. response = self.xhr_response(xhr_url_regex)
  430. if not response:
  431. return None
  432. if isinstance(response.content, dict):
  433. return json.dumps(response.content, ensure_ascii=False)
  434. return response.content
  435. def xhr_json(self, xhr_url_regex) -> Optional[dict]:
  436. text = self.xhr_text(xhr_url_regex)
  437. return json.loads(text)
  438. def __getattr__(self, name):
  439. if self.driver:
  440. return getattr(self.driver, name)
  441. else:
  442. raise AttributeError
  443. # def __del__(self):
  444. # self.quit()