webdriver.py 13 KB


  1. import json
  2. from collections import namedtuple
  3. from typing import Optional
  4. from selenium import webdriver
  5. from selenium.common.exceptions import WebDriverException
  6. from selenium.webdriver.common.by import By
  7. from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
  8. from selenium.webdriver.support import expected_conditions as EC
  9. from selenium.webdriver.support.ui import WebDriverWait
  10. from common.log import logger
  11. DEFAULT_USERAGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:96.0) Gecko/20100101 Firefox/96.0"
  12. Netloc = namedtuple('Netloc', ['host', 'port'])
  13. def until_wait(
  14. driver,
  15. *,
  16. xpath=None,
  17. classname=None,
  18. text=None,
  19. timeout=None
  20. ):
  21. """
  22. 显示等待页面加载,否则抛出TimeoutException
  23. :param driver: 浏览器驱动
  24. :param xpath: xpath规则,页面等待特征
  25. :param classname: class属性名称,页面等待特征
  26. :param text: 期待的文本
  27. :param timeout: 超时时间
  28. :return:
  29. """
  30. _timeout = (timeout or 60)
  31. wait = WebDriverWait(driver, _timeout, 0.2)
  32. if xpath is not None:
  33. locator = (By.XPATH, xpath)
  34. if text is not None:
  35. wait.until(EC.text_to_be_present_in_element(locator, text))
  36. else:
  37. wait.until(EC.presence_of_element_located(locator))
  38. elif classname is not None:
  39. locator = (By.CLASS_NAME, classname)
  40. if text is not None:
  41. wait.until(EC.text_to_be_present_in_element(locator, text))
  42. else:
  43. wait.until(EC.presence_of_element_located(locator))
  44. def check_navigator(driver):
  45. script = "return window.navigator.webdriver"
  46. return driver.execute_script(script)
  47. def netloc(proxies: dict) -> Netloc:
  48. host, port = proxies["https"].replace("socks5://", "").split(":")
  49. return Netloc(host, port)
  50. class XhrRequest:
  51. def __init__(self, url, data, headers):
  52. self.url = url
  53. self.data = data
  54. self.headers = headers
  55. class XhrResponse:
  56. def __init__(self, request: XhrRequest, url, headers, content, status_code):
  57. self.request = request
  58. self.url = url
  59. self.headers = headers
  60. self.content = content
  61. self.status_code = status_code
  62. class FireFoxWebDriver:
  63. def __init__(
  64. self,
  65. user_agent=None,
  66. proxy=None,
  67. headless=True,
  68. timeout=60,
  69. load_images=False,
  70. executable_path=None,
  71. window_size: tuple = None,
  72. xhr_url_regexes: list = None,
  73. ):
  74. """
  75. 支持 firefox
  76. Args:
  77. user_agent: 字符串 或 无参函数,返回值为user_agent
  78. proxy: {'https://sockets:xxx.xxx.xxx.xxx:xxxx'} 或 无参函数,返回值为代理地址
  79. headless: 是否启用无头模式, 默认:无头模式
  80. timeout: 请求超时时间
  81. load_images: 是否加载图片
  82. executable_path: 浏览器路径,默认为默认路径
  83. window_size: # 窗口大小
  84. xhr_url_regexes: 拦截xhr接口,支持正则,数组类型
  85. """
  86. self._user_agent = user_agent or DEFAULT_USERAGENT
  87. self._proxy = proxy
  88. self._load_images = load_images
  89. self._headless = headless
  90. self._timeout = timeout
  91. self._xhr_url_regexes = xhr_url_regexes
  92. self._window_size = window_size
  93. self._executable_path = executable_path
  94. firefox_profile = webdriver.FirefoxProfile()
  95. firefox_options = webdriver.FirefoxOptions()
  96. firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
  97. if self._proxy:
  98. proxy = self._proxy() if callable(self._proxy) else self._proxy
  99. host, port = netloc(proxy)
  100. # 不使用代理=0, 使用代理=1
  101. firefox_profile.set_preference('network.proxy.type', 1)
  102. firefox_profile.set_preference('network.proxy.socks', host)
  103. # 端口必须使用int类型,才会生效
  104. firefox_profile.set_preference('network.proxy.socks_port', int(port))
  105. firefox_profile.update_preferences()
  106. if self._user_agent:
  107. firefox_profile.set_preference(
  108. "general.useragent.override",
  109. self._user_agent() if callable(self._user_agent) else self._user_agent,
  110. )
  111. firefox_profile.update_preferences()
  112. if not self._load_images:
  113. '''
  114. 允许加载所有图像,无论来源如何(默认)=1
  115. 阻止所有图像加载=2
  116. 防止加载第三方图像=3
  117. '''
  118. firefox_profile.set_preference("permissions.default.image", 2)
  119. firefox_profile.update_preferences()
  120. if self._headless:
  121. firefox_options.add_argument("--headless")
  122. firefox_options.add_argument("--disable-gpu")
  123. if self._executable_path:
  124. _driver = webdriver.Firefox(
  125. capabilities=firefox_capabilities,
  126. options=firefox_options,
  127. firefox_profile=firefox_profile,
  128. executable_path=self._executable_path,
  129. )
  130. else:
  131. _driver = webdriver.Firefox(
  132. capabilities=firefox_capabilities,
  133. options=firefox_options,
  134. firefox_profile=firefox_profile,
  135. )
  136. if self._window_size:
  137. _driver.set_window_size(*self._window_size)
  138. self.driver = _driver
  139. def __enter__(self):
  140. return self
  141. def __exit__(self, exc_type, exc_val, exc_tb):
  142. if exc_val:
  143. logger.error(f'{self.__class__.__name__} >>> {exc_type} <> {exc_val}')
  144. self.driver.quit()
  145. return True
  146. def set_page_load_timeout(self, timeout=None):
  147. """
  148. 设置selenium页面执行时间
  149. :param timeout: 超时时间,默认:60s
  150. :return:
  151. """
  152. _timeout = (timeout or self._timeout)
  153. # driver.get(url)一直不返回,但也不报错的问题,这时程序会卡住,设置超时选项能解决这个问题。
  154. self.driver.set_page_load_timeout(_timeout)
  155. # 设置脚本超时时间
  156. self.driver.set_script_timeout(_timeout)
  157. def quit(self):
  158. self.driver.quit()
  159. def xhr_response(self, xhr_url_regex) -> Optional[XhrResponse]:
  160. data = self.driver.execute_script(
  161. f'return window.__ajaxData["{xhr_url_regex}"];'
  162. )
  163. if not data:
  164. return None
  165. request = XhrRequest(**data["request"])
  166. response = XhrResponse(request, **data["response"])
  167. return response
  168. def xhr_text(self, xhr_url_regex) -> Optional[str]:
  169. response = self.xhr_response(xhr_url_regex)
  170. if not response:
  171. return None
  172. return response.content
  173. def xhr_json(self, xhr_url_regex) -> Optional[dict]:
  174. text = self.xhr_text(xhr_url_regex)
  175. return json.loads(text)
  176. def get(self, url):
  177. self.driver.get(url)
  178. @property
  179. def user_agent(self):
  180. return self.driver.execute_script("return navigator.userAgent;")
  181. @property
  182. def page_title(self):
  183. return self.driver.execute_script('return document.title')
  184. @property
  185. def page_source(self):
  186. return self.driver.page_source
  187. def find_element_by_xpath(self, xpath: str):
  188. """
  189. 通过xpath寻找元素,不存在该元素时,抛出 NoSuchElementException
  190. :param xpath: 需要寻找的元素的xpath
  191. :return:
  192. """
  193. return self.driver.find_element_by_xpath(xpath)
  194. def until_wait(
  195. self,
  196. *,
  197. xpath=None,
  198. classname=None,
  199. text=None,
  200. timeout=None
  201. ):
  202. """
  203. 显示等待页面加载,否则抛出TimeoutException
  204. :param xpath: xpath规则,页面等待特征
  205. :param classname: class属性名称,页面等待特征
  206. :param text: 期待的文本
  207. :param timeout: 超时时间
  208. :return:
  209. """
  210. _timeout = (timeout or self._timeout)
  211. wait = WebDriverWait(self.driver, _timeout, 0.2)
  212. if xpath is not None:
  213. locator = (By.XPATH, xpath)
  214. if text is not None:
  215. wait.until(EC.text_to_be_present_in_element(locator, text))
  216. else:
  217. wait.until(EC.presence_of_element_located(locator))
  218. elif classname is not None:
  219. locator = (By.CLASS_NAME, classname)
  220. if text is not None:
  221. wait.until(EC.text_to_be_present_in_element(locator, text))
  222. else:
  223. wait.until(EC.presence_of_element_located(locator))
  224. def switch_to_window(self):
  225. self.driver.execute_script('window.open();')
  226. handles = self.driver.window_handles
  227. self.driver.close()
  228. self.driver.switch_to.window(handles[-1])
  229. class WebDriver(RemoteWebDriver):
  230. FIREFOX = "FIREFOX"
  231. def __init__(
  232. self,
  233. load_images=True,
  234. user_agent=None,
  235. proxy=None,
  236. headless=True,
  237. driver_type=FIREFOX,
  238. timeout=120,
  239. window_size=(1024, 800),
  240. executable_path=None,
  241. custom_argument=None,
  242. **kwargs
  243. ):
  244. """
  245. Args:
  246. load_images: 是否加载图片
  247. user_agent: 字符串 或 无参函数,返回值为user_agent
  248. proxy: {'https://sockets:xxx.xxx.xxx.xxx:xxxx'} 或 无参函数,返回值为代理地址
  249. headless: 是否启用无头模式
  250. driver_type: FIREFOX
  251. timeout: 请求超时时间
  252. window_size: # 窗口大小
  253. executable_path: 浏览器路径,默认为默认路径
  254. **kwargs:
  255. """
  256. self._load_images = load_images
  257. self._user_agent = user_agent or DEFAULT_USERAGENT
  258. self._proxy = proxy
  259. self._headless = headless
  260. self._timeout = timeout
  261. self._window_size = window_size
  262. self._executable_path = executable_path
  263. self._custom_argument = custom_argument
  264. self.proxies = {}
  265. self.user_agent = None
  266. if driver_type == WebDriver.FIREFOX:
  267. self.driver = self.firefox_driver()
  268. self.driver.set_page_load_timeout(self._timeout)
  269. self.driver.set_script_timeout(self._timeout)
  270. def __enter__(self):
  271. return self
  272. def __exit__(self, exc_type, exc_val, exc_tb):
  273. if exc_val:
  274. logger.error(f'{self.__class__.__name__} >>> {exc_type} <> {exc_val}')
  275. self.quit()
  276. return True
  277. def firefox_driver(self):
  278. firefox_profile = webdriver.FirefoxProfile()
  279. firefox_options = webdriver.FirefoxOptions()
  280. firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
  281. firefox_profile.set_preference("dom.webdriver.enabled", False)
  282. firefox_profile.set_preference('useAutomationExtension', False)
  283. # firefox_profile.set_preference('privacy.resistFingerprinting', True) # 启用指纹保护
  284. if self._proxy:
  285. proxy = self._proxy() if callable(self._proxy) else self._proxy
  286. host, port = netloc(proxy)
  287. # 使用socks5 代理, 不使用代理:0, 使用代理:1
  288. firefox_profile.set_preference('network.proxy.type', 1)
  289. firefox_profile.set_preference('network.proxy.socks', host)
  290. firefox_profile.set_preference('network.proxy.socks_port', int(port))
  291. if self._user_agent:
  292. firefox_profile.set_preference(
  293. "general.useragent.override",
  294. self._user_agent() if callable(self._user_agent) else self._user_agent,
  295. )
  296. if not self._load_images:
  297. '''
  298. 允许加载所有图像,无论来源如何(默认)=1
  299. 阻止所有图像加载=2
  300. 防止加载第三方图像=3
  301. '''
  302. firefox_profile.set_preference("permissions.default.image", 2)
  303. firefox_profile.update_preferences()
  304. if self._headless:
  305. firefox_options.add_argument("--headless")
  306. firefox_options.add_argument("--disable-gpu")
  307. # 添加自定义的配置参数
  308. if self._custom_argument:
  309. for arg in self._custom_argument:
  310. firefox_options.add_argument(arg)
  311. if self._executable_path:
  312. driver = webdriver.Firefox(
  313. capabilities=firefox_capabilities,
  314. options=firefox_options,
  315. firefox_profile=firefox_profile,
  316. executable_path=self._executable_path,
  317. )
  318. else:
  319. driver = webdriver.Firefox(
  320. capabilities=firefox_capabilities,
  321. options=firefox_options,
  322. firefox_profile=firefox_profile,
  323. )
  324. if self._window_size:
  325. driver.set_window_size(*self._window_size)
  326. return driver
  327. @property
  328. def cookies(self):
  329. cookies_json = {}
  330. for cookie in self.driver.get_cookies():
  331. cookies_json[cookie["name"]] = cookie["value"]
  332. return cookies_json
  333. @cookies.setter
  334. def cookies(self, val: dict):
  335. """
  336. 设置cookie
  337. Args:
  338. val: {"key":"value", "key2":"value2"}
  339. Returns:
  340. """
  341. for key, value in val.items():
  342. self.driver.add_cookie({"name": key, "value": value})
  343. def __getattr__(self, name):
  344. if self.driver:
  345. return getattr(self.driver, name)
  346. else:
  347. raise AttributeError