import json from collections import namedtuple from typing import Optional from selenium import webdriver from selenium.common.exceptions import WebDriverException from selenium.webdriver.common.by import By from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from common.log import logger DEFAULT_USERAGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:96.0) Gecko/20100101 Firefox/96.0" Netloc = namedtuple('Netloc', ['host', 'port']) def until_wait( driver, *, xpath=None, classname=None, text=None, timeout=None ): """ 显示等待页面加载,否则抛出TimeoutException :param driver: 浏览器驱动 :param xpath: xpath规则,页面等待特征 :param classname: class属性名称,页面等待特征 :param text: 期待的文本 :param timeout: 超时时间 :return: """ _timeout = (timeout or 60) wait = WebDriverWait(driver, _timeout, 0.2) if xpath is not None: locator = (By.XPATH, xpath) if text is not None: wait.until(EC.text_to_be_present_in_element(locator, text)) else: wait.until(EC.presence_of_element_located(locator)) elif classname is not None: locator = (By.CLASS_NAME, classname) if text is not None: wait.until(EC.text_to_be_present_in_element(locator, text)) else: wait.until(EC.presence_of_element_located(locator)) def check_navigator(driver): script = "return window.navigator.webdriver" return driver.execute_script(script) def netloc(proxies: dict) -> Netloc: host, port = proxies["https"].replace("socks5://", "").split(":") return Netloc(host, port) class XhrRequest: def __init__(self, url, data, headers): self.url = url self.data = data self.headers = headers class XhrResponse: def __init__(self, request: XhrRequest, url, headers, content, status_code): self.request = request self.url = url self.headers = headers self.content = content self.status_code = status_code class FireFoxWebDriver: def __init__( self, user_agent=None, proxy=None, headless=True, timeout=60, load_images=False, executable_path=None, window_size: tuple = None, xhr_url_regexes: list = None, ): """ 支持 firefox Args: user_agent: 字符串 或 无参函数,返回值为user_agent proxy: {'https://sockets:xxx.xxx.xxx.xxx:xxxx'} 或 无参函数,返回值为代理地址 headless: 是否启用无头模式, 默认:无头模式 timeout: 请求超时时间 load_images: 是否加载图片 executable_path: 浏览器路径,默认为默认路径 window_size: # 窗口大小 xhr_url_regexes: 拦截xhr接口,支持正则,数组类型 """ self._user_agent = user_agent or DEFAULT_USERAGENT self._proxy = proxy self._load_images = load_images self._headless = headless self._timeout = timeout self._xhr_url_regexes = xhr_url_regexes self._window_size = window_size self._executable_path = executable_path firefox_profile = webdriver.FirefoxProfile() firefox_options = webdriver.FirefoxOptions() firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX if self._proxy: proxy = self._proxy() if callable(self._proxy) else self._proxy host, port = netloc(proxy) # 不使用代理=0, 使用代理=1 firefox_profile.set_preference('network.proxy.type', 1) firefox_profile.set_preference('network.proxy.socks', host) # 端口必须使用int类型,才会生效 firefox_profile.set_preference('network.proxy.socks_port', int(port)) firefox_profile.update_preferences() if self._user_agent: firefox_profile.set_preference( "general.useragent.override", self._user_agent() if callable(self._user_agent) else self._user_agent, ) firefox_profile.update_preferences() if not self._load_images: ''' 允许加载所有图像,无论来源如何(默认)=1 阻止所有图像加载=2 防止加载第三方图像=3 ''' firefox_profile.set_preference("permissions.default.image", 2) firefox_profile.update_preferences() if self._headless: firefox_options.add_argument("--headless") firefox_options.add_argument("--disable-gpu") if self._executable_path: _driver = webdriver.Firefox( capabilities=firefox_capabilities, options=firefox_options, firefox_profile=firefox_profile, executable_path=self._executable_path, ) else: _driver = webdriver.Firefox( capabilities=firefox_capabilities, options=firefox_options, firefox_profile=firefox_profile, ) if self._window_size: _driver.set_window_size(*self._window_size) self.driver = _driver def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val: logger.error(f'{self.__class__.__name__} >>> {exc_type} <> {exc_val}') self.driver.quit() return True def set_page_load_timeout(self, timeout=None): """ 设置selenium页面执行时间 :param timeout: 超时时间,默认:60s :return: """ _timeout = (timeout or self._timeout) # driver.get(url)一直不返回,但也不报错的问题,这时程序会卡住,设置超时选项能解决这个问题。 self.driver.set_page_load_timeout(_timeout) # 设置脚本超时时间 self.driver.set_script_timeout(_timeout) def quit(self): self.driver.quit() def xhr_response(self, xhr_url_regex) -> Optional[XhrResponse]: data = self.driver.execute_script( f'return window.__ajaxData["{xhr_url_regex}"];' ) if not data: return None request = XhrRequest(**data["request"]) response = XhrResponse(request, **data["response"]) return response def xhr_text(self, xhr_url_regex) -> Optional[str]: response = self.xhr_response(xhr_url_regex) if not response: return None return response.content def xhr_json(self, xhr_url_regex) -> Optional[dict]: text = self.xhr_text(xhr_url_regex) return json.loads(text) def get(self, url): self.driver.get(url) @property def user_agent(self): return self.driver.execute_script("return navigator.userAgent;") @property def page_title(self): return self.driver.execute_script('return document.title') @property def page_source(self): return self.driver.page_source def find_element_by_xpath(self, xpath: str): """ 通过xpath寻找元素,不存在该元素时,抛出 NoSuchElementException :param xpath: 需要寻找的元素的xpath :return: """ return self.driver.find_element_by_xpath(xpath) def until_wait( self, *, xpath=None, classname=None, text=None, timeout=None ): """ 显示等待页面加载,否则抛出TimeoutException :param xpath: xpath规则,页面等待特征 :param classname: class属性名称,页面等待特征 :param text: 期待的文本 :param timeout: 超时时间 :return: """ _timeout = (timeout or self._timeout) wait = WebDriverWait(self.driver, _timeout, 0.2) if xpath is not None: locator = (By.XPATH, xpath) if text is not None: wait.until(EC.text_to_be_present_in_element(locator, text)) else: wait.until(EC.presence_of_element_located(locator)) elif classname is not None: locator = (By.CLASS_NAME, classname) if text is not None: wait.until(EC.text_to_be_present_in_element(locator, text)) else: wait.until(EC.presence_of_element_located(locator)) def switch_to_window(self): self.driver.execute_script('window.open();') handles = self.driver.window_handles self.driver.close() self.driver.switch_to.window(handles[-1]) class WebDriver(RemoteWebDriver): FIREFOX = "FIREFOX" def __init__( self, load_images=True, user_agent=None, proxy=None, headless=True, driver_type=FIREFOX, timeout=120, window_size=(1024, 800), executable_path=None, custom_argument=None, **kwargs ): """ Args: load_images: 是否加载图片 user_agent: 字符串 或 无参函数,返回值为user_agent proxy: {'https://sockets:xxx.xxx.xxx.xxx:xxxx'} 或 无参函数,返回值为代理地址 headless: 是否启用无头模式 driver_type: FIREFOX timeout: 请求超时时间 window_size: # 窗口大小 executable_path: 浏览器路径,默认为默认路径 **kwargs: """ self._load_images = load_images self._user_agent = user_agent or DEFAULT_USERAGENT self._proxy = proxy self._headless = headless self._timeout = timeout self._window_size = window_size self._executable_path = executable_path self._custom_argument = custom_argument self.proxies = {} self.user_agent = None if driver_type == WebDriver.FIREFOX: self.driver = self.firefox_driver() self.driver.set_page_load_timeout(self._timeout) self.driver.set_script_timeout(self._timeout) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val: logger.error(f'{self.__class__.__name__} >>> {exc_type} <> {exc_val}') self.quit() return True def firefox_driver(self): firefox_profile = webdriver.FirefoxProfile() firefox_options = webdriver.FirefoxOptions() firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX firefox_profile.set_preference("dom.webdriver.enabled", False) firefox_profile.set_preference('useAutomationExtension', False) # firefox_profile.set_preference('privacy.resistFingerprinting', True) # 启用指纹保护 if self._proxy: proxy = self._proxy() if callable(self._proxy) else self._proxy host, port = netloc(proxy) # 使用socks5 代理, 不使用代理:0, 使用代理:1 firefox_profile.set_preference('network.proxy.type', 1) firefox_profile.set_preference('network.proxy.socks', host) firefox_profile.set_preference('network.proxy.socks_port', int(port)) if self._user_agent: firefox_profile.set_preference( "general.useragent.override", self._user_agent() if callable(self._user_agent) else self._user_agent, ) if not self._load_images: ''' 允许加载所有图像,无论来源如何(默认)=1 阻止所有图像加载=2 防止加载第三方图像=3 ''' firefox_profile.set_preference("permissions.default.image", 2) firefox_profile.update_preferences() if self._headless: firefox_options.add_argument("--headless") firefox_options.add_argument("--disable-gpu") # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: firefox_options.add_argument(arg) if self._executable_path: driver = webdriver.Firefox( capabilities=firefox_capabilities, options=firefox_options, firefox_profile=firefox_profile, executable_path=self._executable_path, ) else: driver = webdriver.Firefox( capabilities=firefox_capabilities, options=firefox_options, firefox_profile=firefox_profile, ) if self._window_size: driver.set_window_size(*self._window_size) return driver @property def cookies(self): cookies_json = {} for cookie in self.driver.get_cookies(): cookies_json[cookie["name"]] = cookie["value"] return cookies_json @cookies.setter def cookies(self, val: dict): """ 设置cookie Args: val: {"key":"value", "key2":"value2"} Returns: """ for key, value in val.items(): self.driver.add_cookie({"name": key, "value": value}) def __getattr__(self, name): if self.driver: return getattr(self.driver, name) else: raise AttributeError