import datetime from collections import namedtuple from pathlib import Path from selenium import webdriver from selenium.common.exceptions import WebDriverException from selenium.webdriver.common.by import By from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support.ui import WebDriverWait from common.log import logger _absolute = Path(__file__).absolute().parent.parent _date = datetime.datetime.now().strftime('%Y-%m-%d') _service_log_path = (_absolute / f'logs/geckodriver-{_date}.log').resolve() DEFAULT_USERAGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:96.0) Gecko/20100101 Firefox/96.0" Netloc = namedtuple('Netloc', ['host', 'port']) def check_navigator(driver): script = "return window.navigator.webdriver" return driver.execute_script(script) def netloc(proxies: dict) -> Netloc: host, port = proxies["https"].replace("socks5://", "").split(":") return Netloc(host, port) class FireFoxWebDriverError(WebDriverException): pass class WebDriver(RemoteWebDriver): FIREFOX = "FIREFOX" def __init__( self, load_images=True, user_agent=None, proxy=None, headless=True, driver_type=FIREFOX, timeout=120, window_size=(1024, 800), executable_path=None, custom_argument=None, **kwargs ): """ Args: load_images: 是否加载图片 user_agent: 字符串 或 无参函数,返回值为user_agent proxy: {'https://sockets:xxx.xxx.xxx.xxx:xxxx'} 或 无参函数,返回值为代理地址 headless: 是否启用无头模式 driver_type: FIREFOX timeout: 请求超时时间 window_size: # 窗口大小 executable_path: 浏览器路径,默认为默认路径 **kwargs: """ self._load_images = load_images self._user_agent = user_agent or DEFAULT_USERAGENT self._proxy = proxy self._headless = headless self._timeout = timeout self._window_size = window_size self._executable_path = executable_path self._custom_argument = custom_argument self.proxies = {} self.user_agent = None if driver_type == WebDriver.FIREFOX: self.driver = self.firefox_driver() self.driver.set_page_load_timeout(self._timeout) self.driver.set_script_timeout(self._timeout) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val: logger.error(f'{self.__class__.__name__} <> {exc_type.__name__}: {exc_val}') self.quit() return False def firefox_driver(self): firefox_profile = webdriver.FirefoxProfile() firefox_options = webdriver.FirefoxOptions() firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX firefox_profile.set_preference("dom.webdriver.enabled", False) firefox_profile.set_preference('useAutomationExtension', False) # firefox_profile.set_preference('privacy.resistFingerprinting', True) # 启用指纹保护 if self._proxy: proxy = self._proxy() if callable(self._proxy) else self._proxy host, port = netloc(proxy) # 使用socks5 代理, 不使用代理:0, 使用代理:1 firefox_profile.set_preference('network.proxy.type', 1) firefox_profile.set_preference('network.proxy.socks', host) firefox_profile.set_preference('network.proxy.socks_port', int(port)) if self._user_agent: firefox_profile.set_preference( "general.useragent.override", self._user_agent() if callable(self._user_agent) else self._user_agent, ) if not self._load_images: ''' 允许加载所有图像,无论来源如何(默认)=1 阻止所有图像加载=2 防止加载第三方图像=3 ''' firefox_profile.set_preference("permissions.default.image", 2) firefox_profile.update_preferences() if self._headless: firefox_options.add_argument("--headless") firefox_options.add_argument("--disable-gpu") # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: firefox_options.add_argument(arg) if self._executable_path: driver = webdriver.Firefox( service_log_path=str(_service_log_path), capabilities=firefox_capabilities, options=firefox_options, firefox_profile=firefox_profile, executable_path=self._executable_path, ) else: driver = webdriver.Firefox( service_log_path=str(_service_log_path), capabilities=firefox_capabilities, options=firefox_options, firefox_profile=firefox_profile, ) if self._window_size: driver.set_window_size(*self._window_size) return driver @property def cookies(self): cookies_json = {} for cookie in self.driver.get_cookies(): cookies_json[cookie["name"]] = cookie["value"] return cookies_json @cookies.setter def cookies(self, val: dict): """ 设置cookie Args: val: {"key":"value", "key2":"value2"} Returns: """ for key, value in val.items(): self.driver.add_cookie({"name": key, "value": value}) def __getattr__(self, name): if self.driver: return getattr(self.driver, name) else: raise AttributeError def get_user_agent(driver): return driver.execute_script("return navigator.userAgent;") def get_title(driver): return driver.execute_script('return document.title') def until_wait( driver, *, xpath=None, classname=None, text=None, timeout=None ): """ 显示等待页面加载,否则抛出TimeoutException :param driver: 浏览器驱动 :param xpath: xpath规则,页面等待特征 :param classname: class属性名称,页面等待特征 :param text: 期待的文本 :param timeout: 超时时间 :return: """ _timeout = (timeout or 60) wait = WebDriverWait(driver, _timeout, 0.2) if xpath is not None: locator = (By.XPATH, xpath) if text is not None: wait.until(EC.text_to_be_present_in_element(locator, text)) else: wait.until(EC.presence_of_element_located(locator)) elif classname is not None: locator = (By.CLASS_NAME, classname) if text is not None: wait.until(EC.text_to_be_present_in_element(locator, text)) else: wait.until(EC.presence_of_element_located(locator)) def new_window(driver): """新的窗口""" driver.execute_script('window.open();') handles = driver.window_handles driver.switch_to.window(handles[-1])