123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- # -*- coding: utf-8 -*-
- """
- Created on 2021/3/18 4:59 下午
- ---------
- @summary:
- ---------
- @author: Boris
- @email: boris_liu@foxmail.com
- """
- import json
- import logging
- import os
- from typing import Optional, Union, List
- from selenium import webdriver
- from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
- from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
- from webdriver_manager.chrome import ChromeDriverManager
- from webdriver_manager.firefox import GeckoDriverManager
- from feapder.utils import tools
- from feapder.utils.log import log, OTHERS_LOG_LEVAL
- from feapder.utils.webdriver.webdirver import *
- # 屏蔽webdriver_manager日志
- logging.getLogger("WDM").setLevel(OTHERS_LOG_LEVAL)
- class SeleniumDriver(WebDriver, RemoteWebDriver):
- CHROME = "CHROME"
- EDGE = "EDGE"
- PHANTOMJS = "PHANTOMJS"
- FIREFOX = "FIREFOX"
- __CHROME_ATTRS__ = {
- "executable_path",
- "port",
- "options",
- "service_args",
- "desired_capabilities",
- "service_log_path",
- "chrome_options",
- "keep_alive",
- }
- __EDGE_ATTRS__ = __CHROME_ATTRS__
- __FIREFOX_ATTRS__ = {
- "firefox_profile",
- "firefox_binary",
- "timeout",
- "capabilities",
- "proxy",
- "executable_path",
- "options",
- "service_log_path",
- "firefox_options",
- "service_args",
- "desired_capabilities",
- "log_path",
- "keep_alive",
- }
- __PHANTOMJS_ATTRS__ = {
- "executable_path",
- "port",
- "desired_capabilities",
- "service_args",
- "service_log_path",
- }
- def __init__(self, xhr_url_regexes: list = None, **kwargs):
- """
- Args:
- xhr_url_regexes: 拦截xhr接口,支持正则,数组类型
- **kwargs:
- """
- super(SeleniumDriver, self).__init__(**kwargs)
- self._xhr_url_regexes = xhr_url_regexes
- self._driver_type = self._driver_type or SeleniumDriver.CHROME
- if self._xhr_url_regexes and self._driver_type != SeleniumDriver.CHROME:
- raise Exception(
- "xhr_url_regexes only support by chrome now! eg: driver_type=SeleniumDriver.CHROME"
- )
- if self._driver_type == SeleniumDriver.CHROME:
- self.driver = self.chrome_driver()
- elif self._driver_type == SeleniumDriver.EDGE:
- self.driver = self.edge_driver()
- elif self._driver_type == SeleniumDriver.PHANTOMJS:
- self.driver = self.phantomjs_driver()
- elif self._driver_type == SeleniumDriver.FIREFOX:
- self.driver = self.firefox_driver()
- else:
- raise TypeError(
- "dirver_type must be one of CHROME or PHANTOMJS or FIREFOX, but received {}".format(
- type(self._driver_type)
- )
- )
- # driver.get(url)一直不返回,但也不报错的问题,这时程序会卡住,设置超时选项能解决这个问题。
- self.driver.set_page_load_timeout(self._timeout)
- # 设置10秒脚本超时时间
- self.driver.set_script_timeout(self._timeout)
- self.url = None
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_val:
- log.error(exc_val)
- self.quit()
- return True
- def filter_kwargs(self, kwargs: dict, driver_attrs: set):
- if not kwargs:
- return {}
- data = {}
- for key, value in kwargs.items():
- if key in driver_attrs:
- data[key] = value
- return data
- def get_driver(self):
- return self.driver
- def firefox_driver(self):
- if webdriver.__version__ >= "4.0.0":
- raise Exception(
- f"暂未适配selenium=={webdriver.__version__}版本的firefox API,建议安装selenium==3.141.0版本或使用CHROME浏览器"
- )
- firefox_profile = webdriver.FirefoxProfile()
- firefox_options = webdriver.FirefoxOptions()
- firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
- try:
- from selenium.webdriver.firefox.service import Service
- except (ImportError, ModuleNotFoundError):
- Service = None
- if self._proxy:
- proxy = self._proxy() if callable(self._proxy) else self._proxy
- firefox_capabilities["marionette"] = True
- firefox_capabilities["proxy"] = {
- "proxyType": "MANUAL",
- "httpProxy": proxy,
- "ftpProxy": proxy,
- "sslProxy": proxy,
- }
- if self._user_agent:
- firefox_profile.set_preference(
- "general.useragent.override",
- self._user_agent() if callable(self._user_agent) else self._user_agent,
- )
- if not self._load_images:
- firefox_profile.set_preference("permissions.default.image", 2)
- if self._headless:
- firefox_options.add_argument("--headless")
- firefox_options.add_argument("--disable-gpu")
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- firefox_options.add_argument(arg)
- kwargs = self.filter_kwargs(self._kwargs, self.__FIREFOX_ATTRS__)
- if Service is None:
- if self._executable_path:
- kwargs.update(executable_path=self._executable_path)
- elif self._auto_install_driver:
- kwargs.update(executable_path=GeckoDriverManager().install())
- else:
- if self._executable_path:
- kwargs.update(service=Service(self._executable_path))
- elif self._auto_install_driver:
- kwargs.update(service=Service(GeckoDriverManager().install()))
- driver = webdriver.Firefox(
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- **kwargs,
- )
- if self._window_size:
- driver.set_window_size(*self._window_size)
- return driver
- def chrome_driver(self):
- chrome_options = webdriver.ChromeOptions()
- # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
- chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
- chrome_options.add_experimental_option("useAutomationExtension", False)
- # docker 里运行需要
- chrome_options.add_argument("--no-sandbox")
- try:
- from selenium.webdriver.chrome.service import Service
- except (ImportError, ModuleNotFoundError):
- Service = None
- if self._proxy:
- chrome_options.add_argument(
- "--proxy-server={}".format(
- self._proxy() if callable(self._proxy) else self._proxy
- )
- )
- if self._user_agent:
- chrome_options.add_argument(
- "user-agent={}".format(
- self._user_agent()
- if callable(self._user_agent)
- else self._user_agent
- )
- )
- if not self._load_images:
- chrome_options.add_experimental_option(
- "prefs", {"profile.managed_default_content_settings.images": 2}
- )
- if self._headless:
- chrome_options.add_argument("--headless")
- chrome_options.add_argument("--disable-gpu")
- if self._window_size:
- chrome_options.add_argument(
- "--window-size={},{}".format(self._window_size[0], self._window_size[1])
- )
- if self._download_path:
- os.makedirs(self._download_path, exist_ok=True)
- prefs = {
- "download.prompt_for_download": False,
- "download.default_directory": self._download_path,
- }
- chrome_options.add_experimental_option("prefs", prefs)
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- chrome_options.add_argument(arg)
- kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__)
- if Service is None:
- if self._executable_path:
- kwargs.update(executable_path=self._executable_path)
- elif self._auto_install_driver:
- kwargs.update(executable_path=ChromeDriverManager().install())
- else:
- if self._executable_path:
- kwargs.update(service=Service(self._executable_path))
- elif self._auto_install_driver:
- kwargs.update(service=Service(ChromeDriverManager().install()))
- driver = webdriver.Chrome(options=chrome_options, **kwargs)
- # 隐藏浏览器特征
- if self._use_stealth_js:
- with open(
- os.path.join(os.path.dirname(__file__), "../js/stealth.min.js")
- ) as f:
- js = f.read()
- driver.execute_cdp_cmd(
- "Page.addScriptToEvaluateOnNewDocument", {"source": js}
- )
- if self._xhr_url_regexes:
- assert isinstance(self._xhr_url_regexes, list)
- with open(
- os.path.join(os.path.dirname(__file__), "../js/intercept.js")
- ) as f:
- js = f.read()
- driver.execute_cdp_cmd(
- "Page.addScriptToEvaluateOnNewDocument", {"source": js}
- )
- js = f"window.__urlRegexes = {self._xhr_url_regexes}"
- driver.execute_cdp_cmd(
- "Page.addScriptToEvaluateOnNewDocument", {"source": js}
- )
- if self._download_path:
- driver.command_executor._commands["send_command"] = (
- "POST",
- "/session/$sessionId/chromium/send_command",
- )
- params = {
- "cmd": "Page.setDownloadBehavior",
- "params": {"behavior": "allow", "downloadPath": self._download_path},
- }
- driver.execute("send_command", params)
- return driver
- def edge_driver(self):
- edge_options = webdriver.EdgeOptions()
- # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
- edge_options.add_experimental_option("excludeSwitches", ["enable-automation"])
- edge_options.add_experimental_option("useAutomationExtension", False)
- # docker 里运行需要
- edge_options.add_argument("--no-sandbox")
- try:
- from selenium.webdriver.edge.service import Service
- except (ImportError, ModuleNotFoundError):
- Service = None
- if self._proxy:
- edge_options.add_argument(
- "--proxy-server={}".format(
- self._proxy() if callable(self._proxy) else self._proxy
- )
- )
- if self._user_agent:
- edge_options.add_argument(
- "user-agent={}".format(
- self._user_agent()
- if callable(self._user_agent)
- else self._user_agent
- )
- )
- if not self._load_images:
- edge_options.add_experimental_option(
- "prefs", {"profile.managed_default_content_settings.images": 2}
- )
- if self._headless:
- edge_options.add_argument("--headless")
- edge_options.add_argument("--disable-gpu")
- if self._window_size:
- edge_options.add_argument(
- "--window-size={},{}".format(self._window_size[0], self._window_size[1])
- )
- if self._download_path:
- os.makedirs(self._download_path, exist_ok=True)
- prefs = {
- "download.prompt_for_download": False,
- "download.default_directory": self._download_path,
- }
- edge_options.add_experimental_option("prefs", prefs)
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- edge_options.add_argument(arg)
- kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__)
- if Service is None:
- if self._executable_path:
- kwargs.update(executable_path=self._executable_path)
- elif self._auto_install_driver:
- raise NotImplementedError("edge not support auto install driver")
- else:
- if self._executable_path:
- kwargs.update(service=Service(self._executable_path))
- elif self._auto_install_driver:
- raise NotImplementedError("edge not support auto install driver")
- driver = webdriver.Edge(options=edge_options, **kwargs)
- # 隐藏浏览器特征
- if self._use_stealth_js:
- with open(
- os.path.join(os.path.dirname(__file__), "../js/stealth.min.js")
- ) as f:
- js = f.read()
- driver.execute_cdp_cmd(
- "Page.addScriptToEvaluateOnNewDocument", {"source": js}
- )
- if self._xhr_url_regexes:
- assert isinstance(self._xhr_url_regexes, list)
- with open(
- os.path.join(os.path.dirname(__file__), "../js/intercept.js")
- ) as f:
- js = f.read()
- driver.execute_cdp_cmd(
- "Page.addScriptToEvaluateOnNewDocument", {"source": js}
- )
- js = f"window.__urlRegexes = {self._xhr_url_regexes}"
- driver.execute_cdp_cmd(
- "Page.addScriptToEvaluateOnNewDocument", {"source": js}
- )
- if self._download_path:
- driver.command_executor._commands["send_command"] = (
- "POST",
- "/session/$sessionId/chromium/send_command",
- )
- params = {
- "cmd": "Page.setDownloadBehavior",
- "params": {"behavior": "allow", "downloadPath": self._download_path},
- }
- driver.execute("send_command", params)
- return driver
- def phantomjs_driver(self):
- import warnings
- warnings.filterwarnings("ignore")
- service_args = []
- dcap = DesiredCapabilities.PHANTOMJS
- if self._proxy:
- service_args.append(
- "--proxy=%s" % self._proxy() if callable(self._proxy) else self._proxy
- )
- if self._user_agent:
- dcap["phantomjs.page.settings.userAgent"] = (
- self._user_agent() if callable(self._user_agent) else self._user_agent
- )
- if not self._load_images:
- service_args.append("--load-images=no")
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- service_args.append(arg)
- kwargs = self.filter_kwargs(self._kwargs, self.__PHANTOMJS_ATTRS__)
- if self._executable_path:
- kwargs.update(executable_path=self._executable_path)
- driver = webdriver.PhantomJS(
- service_args=service_args, desired_capabilities=dcap, **kwargs
- )
- if self._window_size:
- driver.set_window_size(self._window_size[0], self._window_size[1])
- del warnings
- return driver
- @property
- def domain(self):
- return tools.get_domain(self.url or self.driver.current_url)
- @property
- def cookies(self):
- cookies_json = {}
- for cookie in self.driver.get_cookies():
- cookies_json[cookie["name"]] = cookie["value"]
- return cookies_json
- @cookies.setter
- def cookies(self, val: Union[dict, List[dict]]):
- """
- 设置cookie
- Args:
- val: {"key":"value", "key2":"value2"}
- Returns:
- """
- if isinstance(val, list):
- for cookie in val:
- # "path", "domain", "secure", "expiry"
- _cookie = {
- "name": cookie.get("name"),
- "value": cookie.get("value"),
- "domain": cookie.get("domain"),
- "path": cookie.get("path"),
- "expires": cookie.get("expires"),
- "secure": cookie.get("secure"),
- }
- self.driver.add_cookie(_cookie)
- else:
- for key, value in val.items():
- self.driver.add_cookie({"name": key, "value": value})
- @property
- def user_agent(self):
- return self.driver.execute_script("return navigator.userAgent;")
- def xhr_response(self, xhr_url_regex) -> Optional[InterceptResponse]:
- data = self.driver.execute_script(
- f'return window.__ajaxData["{xhr_url_regex}"];'
- )
- if not data:
- return None
- request = InterceptRequest(**data["request"])
- response = InterceptResponse(request, **data["response"])
- return response
- def xhr_data(self, xhr_url_regex) -> Union[str, dict, None]:
- response = self.xhr_response(xhr_url_regex)
- if not response:
- return None
- return response.content
- def xhr_text(self, xhr_url_regex) -> Optional[str]:
- response = self.xhr_response(xhr_url_regex)
- if not response:
- return None
- if isinstance(response.content, dict):
- return json.dumps(response.content, ensure_ascii=False)
- return response.content
- def xhr_json(self, xhr_url_regex) -> Optional[dict]:
- text = self.xhr_text(xhr_url_regex)
- return json.loads(text)
- def __getattr__(self, name):
- if self.driver:
- return getattr(self.driver, name)
- else:
- raise AttributeError
- # def __del__(self):
- # self.quit()
|