123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414 |
- import json
- from collections import namedtuple
- from typing import Optional
- from selenium import webdriver
- from selenium.common.exceptions import WebDriverException
- from selenium.webdriver.common.by import By
- from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
- from selenium.webdriver.support import expected_conditions as EC
- from selenium.webdriver.support.ui import WebDriverWait
- from common.log import logger
- DEFAULT_USERAGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:96.0) Gecko/20100101 Firefox/96.0"
- Netloc = namedtuple('Netloc', ['host', 'port'])
- def until_wait(
- driver,
- *,
- xpath=None,
- classname=None,
- text=None,
- timeout=None
- ):
- """
- 显示等待页面加载,否则抛出TimeoutException
- :param driver: 浏览器驱动
- :param xpath: xpath规则,页面等待特征
- :param classname: class属性名称,页面等待特征
- :param text: 期待的文本
- :param timeout: 超时时间
- :return:
- """
- _timeout = (timeout or 60)
- wait = WebDriverWait(driver, _timeout, 0.2)
- if xpath is not None:
- locator = (By.XPATH, xpath)
- if text is not None:
- wait.until(EC.text_to_be_present_in_element(locator, text))
- else:
- wait.until(EC.presence_of_element_located(locator))
- elif classname is not None:
- locator = (By.CLASS_NAME, classname)
- if text is not None:
- wait.until(EC.text_to_be_present_in_element(locator, text))
- else:
- wait.until(EC.presence_of_element_located(locator))
- def check_navigator(driver):
- script = "return window.navigator.webdriver"
- return driver.execute_script(script)
- def netloc(proxies: dict) -> Netloc:
- host, port = proxies["https"].replace("socks5://", "").split(":")
- return Netloc(host, port)
- class XhrRequest:
- def __init__(self, url, data, headers):
- self.url = url
- self.data = data
- self.headers = headers
- class XhrResponse:
- def __init__(self, request: XhrRequest, url, headers, content, status_code):
- self.request = request
- self.url = url
- self.headers = headers
- self.content = content
- self.status_code = status_code
- class FireFoxWebDriver:
- def __init__(
- self,
- user_agent=None,
- proxy=None,
- headless=True,
- timeout=60,
- load_images=False,
- executable_path=None,
- window_size: tuple = None,
- xhr_url_regexes: list = None,
- ):
- """
- 支持 firefox
- Args:
- user_agent: 字符串 或 无参函数,返回值为user_agent
- proxy: {'https://sockets:xxx.xxx.xxx.xxx:xxxx'} 或 无参函数,返回值为代理地址
- headless: 是否启用无头模式, 默认:无头模式
- timeout: 请求超时时间
- load_images: 是否加载图片
- executable_path: 浏览器路径,默认为默认路径
- window_size: # 窗口大小
- xhr_url_regexes: 拦截xhr接口,支持正则,数组类型
- """
- self._user_agent = user_agent or DEFAULT_USERAGENT
- self._proxy = proxy
- self._load_images = load_images
- self._headless = headless
- self._timeout = timeout
- self._xhr_url_regexes = xhr_url_regexes
- self._window_size = window_size
- self._executable_path = executable_path
- firefox_profile = webdriver.FirefoxProfile()
- firefox_options = webdriver.FirefoxOptions()
- firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
- if self._proxy:
- proxy = self._proxy() if callable(self._proxy) else self._proxy
- host, port = netloc(proxy)
- # 不使用代理=0, 使用代理=1
- firefox_profile.set_preference('network.proxy.type', 1)
- firefox_profile.set_preference('network.proxy.socks', host)
- # 端口必须使用int类型,才会生效
- firefox_profile.set_preference('network.proxy.socks_port', int(port))
- firefox_profile.update_preferences()
- if self._user_agent:
- firefox_profile.set_preference(
- "general.useragent.override",
- self._user_agent() if callable(self._user_agent) else self._user_agent,
- )
- firefox_profile.update_preferences()
- if not self._load_images:
- '''
- 允许加载所有图像,无论来源如何(默认)=1
- 阻止所有图像加载=2
- 防止加载第三方图像=3
- '''
- firefox_profile.set_preference("permissions.default.image", 2)
- firefox_profile.update_preferences()
- if self._headless:
- firefox_options.add_argument("--headless")
- firefox_options.add_argument("--disable-gpu")
- if self._executable_path:
- _driver = webdriver.Firefox(
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- executable_path=self._executable_path,
- )
- else:
- _driver = webdriver.Firefox(
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- )
- if self._window_size:
- _driver.set_window_size(*self._window_size)
- self.driver = _driver
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_val:
- logger.error(f'{self.__class__.__name__} >>> {exc_type} <> {exc_val}')
- self.driver.quit()
- return True
- def set_page_load_timeout(self, timeout=None):
- """
- 设置selenium页面执行时间
- :param timeout: 超时时间,默认:60s
- :return:
- """
- _timeout = (timeout or self._timeout)
- # driver.get(url)一直不返回,但也不报错的问题,这时程序会卡住,设置超时选项能解决这个问题。
- self.driver.set_page_load_timeout(_timeout)
- # 设置脚本超时时间
- self.driver.set_script_timeout(_timeout)
- def quit(self):
- self.driver.quit()
- def xhr_response(self, xhr_url_regex) -> Optional[XhrResponse]:
- data = self.driver.execute_script(
- f'return window.__ajaxData["{xhr_url_regex}"];'
- )
- if not data:
- return None
- request = XhrRequest(**data["request"])
- response = XhrResponse(request, **data["response"])
- return response
- def xhr_text(self, xhr_url_regex) -> Optional[str]:
- response = self.xhr_response(xhr_url_regex)
- if not response:
- return None
- return response.content
- def xhr_json(self, xhr_url_regex) -> Optional[dict]:
- text = self.xhr_text(xhr_url_regex)
- return json.loads(text)
- def get(self, url):
- self.driver.get(url)
- @property
- def user_agent(self):
- return self.driver.execute_script("return navigator.userAgent;")
- @property
- def page_title(self):
- return self.driver.execute_script('return document.title')
- @property
- def page_source(self):
- return self.driver.page_source
- def find_element_by_xpath(self, xpath: str):
- """
- 通过xpath寻找元素,不存在该元素时,抛出 NoSuchElementException
- :param xpath: 需要寻找的元素的xpath
- :return:
- """
- return self.driver.find_element_by_xpath(xpath)
- def until_wait(
- self,
- *,
- xpath=None,
- classname=None,
- text=None,
- timeout=None
- ):
- """
- 显示等待页面加载,否则抛出TimeoutException
- :param xpath: xpath规则,页面等待特征
- :param classname: class属性名称,页面等待特征
- :param text: 期待的文本
- :param timeout: 超时时间
- :return:
- """
- _timeout = (timeout or self._timeout)
- wait = WebDriverWait(self.driver, _timeout, 0.2)
- if xpath is not None:
- locator = (By.XPATH, xpath)
- if text is not None:
- wait.until(EC.text_to_be_present_in_element(locator, text))
- else:
- wait.until(EC.presence_of_element_located(locator))
- elif classname is not None:
- locator = (By.CLASS_NAME, classname)
- if text is not None:
- wait.until(EC.text_to_be_present_in_element(locator, text))
- else:
- wait.until(EC.presence_of_element_located(locator))
- def switch_to_window(self):
- self.driver.execute_script('window.open();')
- handles = self.driver.window_handles
- self.driver.close()
- self.driver.switch_to.window(handles[-1])
- class WebDriver(RemoteWebDriver):
- FIREFOX = "FIREFOX"
- def __init__(
- self,
- load_images=True,
- user_agent=None,
- proxy=None,
- headless=True,
- driver_type=FIREFOX,
- timeout=120,
- window_size=(1024, 800),
- executable_path=None,
- custom_argument=None,
- **kwargs
- ):
- """
- Args:
- load_images: 是否加载图片
- user_agent: 字符串 或 无参函数,返回值为user_agent
- proxy: {'https://sockets:xxx.xxx.xxx.xxx:xxxx'} 或 无参函数,返回值为代理地址
- headless: 是否启用无头模式
- driver_type: FIREFOX
- timeout: 请求超时时间
- window_size: # 窗口大小
- executable_path: 浏览器路径,默认为默认路径
- **kwargs:
- """
- self._load_images = load_images
- self._user_agent = user_agent or DEFAULT_USERAGENT
- self._proxy = proxy
- self._headless = headless
- self._timeout = timeout
- self._window_size = window_size
- self._executable_path = executable_path
- self._custom_argument = custom_argument
- self.proxies = {}
- self.user_agent = None
- if driver_type == WebDriver.FIREFOX:
- self.driver = self.firefox_driver()
- self.driver.set_page_load_timeout(self._timeout)
- self.driver.set_script_timeout(self._timeout)
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_val:
- logger.error(f'{self.__class__.__name__} >>> {exc_type} <> {exc_val}')
- self.quit()
- return True
- def firefox_driver(self):
- firefox_profile = webdriver.FirefoxProfile()
- firefox_options = webdriver.FirefoxOptions()
- firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
- firefox_profile.set_preference("dom.webdriver.enabled", False)
- firefox_profile.set_preference('useAutomationExtension', False)
- # firefox_profile.set_preference('privacy.resistFingerprinting', True) # 启用指纹保护
- if self._proxy:
- proxy = self._proxy() if callable(self._proxy) else self._proxy
- host, port = netloc(proxy)
- # 使用socks5 代理, 不使用代理:0, 使用代理:1
- firefox_profile.set_preference('network.proxy.type', 1)
- firefox_profile.set_preference('network.proxy.socks', host)
- firefox_profile.set_preference('network.proxy.socks_port', int(port))
- if self._user_agent:
- firefox_profile.set_preference(
- "general.useragent.override",
- self._user_agent() if callable(self._user_agent) else self._user_agent,
- )
- if not self._load_images:
- '''
- 允许加载所有图像,无论来源如何(默认)=1
- 阻止所有图像加载=2
- 防止加载第三方图像=3
- '''
- firefox_profile.set_preference("permissions.default.image", 2)
- firefox_profile.update_preferences()
- if self._headless:
- firefox_options.add_argument("--headless")
- firefox_options.add_argument("--disable-gpu")
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- firefox_options.add_argument(arg)
- if self._executable_path:
- driver = webdriver.Firefox(
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- executable_path=self._executable_path,
- )
- else:
- driver = webdriver.Firefox(
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- )
- if self._window_size:
- driver.set_window_size(*self._window_size)
- return driver
- @property
- def cookies(self):
- cookies_json = {}
- for cookie in self.driver.get_cookies():
- cookies_json[cookie["name"]] = cookie["value"]
- return cookies_json
- @cookies.setter
- def cookies(self, val: dict):
- """
- 设置cookie
- Args:
- val: {"key":"value", "key2":"value2"}
- Returns:
- """
- for key, value in val.items():
- self.driver.add_cookie({"name": key, "value": value})
- def __getattr__(self, name):
- if self.driver:
- return getattr(self.driver, name)
- else:
- raise AttributeError
|