123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 |
- import datetime
- from collections import namedtuple
- from pathlib import Path
- from selenium import webdriver
- from selenium.common.exceptions import WebDriverException
- from selenium.webdriver.common.by import By
- from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from common.log import logger
- _absolute = Path(__file__).absolute().parent.parent
- _date = datetime.datetime.now().strftime('%Y-%m-%d')
- _service_log_path = (_absolute / f'logs/geckodriver-{_date}.log').resolve()
- DEFAULT_USERAGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:96.0) Gecko/20100101 Firefox/96.0"
- Netloc = namedtuple('Netloc', ['host', 'port'])
- def check_navigator(driver):
- script = "return window.navigator.webdriver"
- return driver.execute_script(script)
- def netloc(proxies: dict) -> Netloc:
- host, port = proxies["https"].replace("socks5://", "").split(":")
- return Netloc(host, port)
- class FireFoxWebDriverError(WebDriverException):
- pass
- class WebDriver(RemoteWebDriver):
- FIREFOX = "FIREFOX"
- def __init__(
- self,
- load_images=True,
- user_agent=None,
- proxy=None,
- headless=True,
- driver_type=FIREFOX,
- timeout=120,
- window_size=(1024, 800),
- executable_path=None,
- custom_argument=None,
- **kwargs
- ):
- """
- Args:
- load_images: 是否加载图片
- user_agent: 字符串 或 无参函数,返回值为user_agent
- proxy: {'https://sockets:xxx.xxx.xxx.xxx:xxxx'} 或 无参函数,返回值为代理地址
- headless: 是否启用无头模式
- driver_type: FIREFOX
- timeout: 请求超时时间
- window_size: # 窗口大小
- executable_path: 浏览器路径,默认为默认路径
- **kwargs:
- """
- self._load_images = load_images
- self._user_agent = user_agent or DEFAULT_USERAGENT
- self._proxy = proxy
- self._headless = headless
- self._timeout = timeout
- self._window_size = window_size
- self._executable_path = executable_path
- self._custom_argument = custom_argument
- self.proxies = {}
- self.user_agent = None
- if driver_type == WebDriver.FIREFOX:
- self.driver = self.firefox_driver()
- self.driver.set_page_load_timeout(self._timeout)
- self.driver.set_script_timeout(self._timeout)
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_val:
- logger.error(f'{self.__class__.__name__} <> {exc_type.__name__}: {exc_val}')
- self.quit()
- return False
- def firefox_driver(self):
- firefox_profile = webdriver.FirefoxProfile()
- firefox_options = webdriver.FirefoxOptions()
- firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
- firefox_profile.set_preference("dom.webdriver.enabled", False)
- firefox_profile.set_preference('useAutomationExtension', False)
- # firefox_profile.set_preference('privacy.resistFingerprinting', True) # 启用指纹保护
- if self._proxy:
- proxy = self._proxy() if callable(self._proxy) else self._proxy
- host, port = netloc(proxy)
- # 使用socks5 代理, 不使用代理:0, 使用代理:1
- firefox_profile.set_preference('network.proxy.type', 1)
- firefox_profile.set_preference('network.proxy.socks', host)
- firefox_profile.set_preference('network.proxy.socks_port', int(port))
- if self._user_agent:
- firefox_profile.set_preference(
- "general.useragent.override",
- self._user_agent() if callable(self._user_agent) else self._user_agent,
- )
- if not self._load_images:
- '''
- 允许加载所有图像,无论来源如何(默认)=1
- 阻止所有图像加载=2
- 防止加载第三方图像=3
- '''
- firefox_profile.set_preference("permissions.default.image", 2)
- firefox_profile.update_preferences()
- if self._headless:
- firefox_options.add_argument("--headless")
- firefox_options.add_argument("--disable-gpu")
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- firefox_options.add_argument(arg)
- if self._executable_path:
- driver = webdriver.Firefox(
- service_log_path=str(_service_log_path),
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- executable_path=self._executable_path,
- )
- else:
- driver = webdriver.Firefox(
- service_log_path=str(_service_log_path),
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- )
- if self._window_size:
- driver.set_window_size(*self._window_size)
- return driver
- @property
- def cookies(self):
- cookies_json = {}
- for cookie in self.driver.get_cookies():
- cookies_json[cookie["name"]] = cookie["value"]
- return cookies_json
- @cookies.setter
- def cookies(self, val: dict):
- """
- 设置cookie
- Args:
- val: {"key":"value", "key2":"value2"}
- Returns:
- """
- for key, value in val.items():
- self.driver.add_cookie({"name": key, "value": value})
- def __getattr__(self, name):
- if self.driver:
- return getattr(self.driver, name)
- else:
- raise AttributeError
- def get_user_agent(driver):
- return driver.execute_script("return navigator.userAgent;")
- def get_title(driver):
- return driver.execute_script('return document.title')
- def until_wait(
- driver,
- *,
- xpath=None,
- classname=None,
- text=None,
- timeout=None
- ):
- """
- 显示等待页面加载,否则抛出TimeoutException
- :param driver: 浏览器驱动
- :param xpath: xpath规则,页面等待特征
- :param classname: class属性名称,页面等待特征
- :param text: 期待的文本
- :param timeout: 超时时间
- :return:
- """
- _timeout = (timeout or 60)
- wait = WebDriverWait(driver, _timeout, 0.2)
- if xpath is not None:
- locator = (By.XPATH, xpath)
- if text is not None:
- wait.until(EC.text_to_be_present_in_element(locator, text))
- else:
- wait.until(EC.presence_of_element_located(locator))
- elif classname is not None:
- locator = (By.CLASS_NAME, classname)
- if text is not None:
- wait.until(EC.text_to_be_present_in_element(locator, text))
- else:
- wait.until(EC.presence_of_element_located(locator))
- def new_window(driver):
- """新的窗口"""
- driver.execute_script('window.open();')
- handles = driver.window_handles
- driver.switch_to.window(handles[-1])
|