# -*- coding: utf-8 -*- """ Created on 2021/3/18 4:59 下午 --------- @summary: --------- @author: Boris @email: boris_liu@foxmail.com """ import json import logging import os from typing import Optional, Union, List from selenium import webdriver from selenium.webdriver.common.desired_capabilities import DesiredCapabilities from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.firefox import GeckoDriverManager from feapder.utils import tools from feapder.utils.log import log, OTHERS_LOG_LEVAL from feapder.utils.webdriver.webdirver import * # 屏蔽webdriver_manager日志 logging.getLogger("WDM").setLevel(OTHERS_LOG_LEVAL) class SeleniumDriver(WebDriver, RemoteWebDriver): CHROME = "CHROME" EDGE = "EDGE" PHANTOMJS = "PHANTOMJS" FIREFOX = "FIREFOX" __CHROME_ATTRS__ = { "executable_path", "port", "options", "service_args", "desired_capabilities", "service_log_path", "chrome_options", "keep_alive", } __EDGE_ATTRS__ = __CHROME_ATTRS__ __FIREFOX_ATTRS__ = { "firefox_profile", "firefox_binary", "timeout", "capabilities", "proxy", "executable_path", "options", "service_log_path", "firefox_options", "service_args", "desired_capabilities", "log_path", "keep_alive", } __PHANTOMJS_ATTRS__ = { "executable_path", "port", "desired_capabilities", "service_args", "service_log_path", } def __init__(self, xhr_url_regexes: list = None, **kwargs): """ Args: xhr_url_regexes: 拦截xhr接口,支持正则,数组类型 **kwargs: """ super(SeleniumDriver, self).__init__(**kwargs) self._xhr_url_regexes = xhr_url_regexes self._driver_type = self._driver_type or SeleniumDriver.CHROME if self._xhr_url_regexes and self._driver_type != SeleniumDriver.CHROME: raise Exception( "xhr_url_regexes only support by chrome now! eg: driver_type=SeleniumDriver.CHROME" ) if self._driver_type == SeleniumDriver.CHROME: self.driver = self.chrome_driver() elif self._driver_type == SeleniumDriver.EDGE: self.driver = self.edge_driver() elif self._driver_type == SeleniumDriver.PHANTOMJS: self.driver = self.phantomjs_driver() elif self._driver_type == SeleniumDriver.FIREFOX: self.driver = self.firefox_driver() else: raise TypeError( "dirver_type must be one of CHROME or PHANTOMJS or FIREFOX, but received {}".format( type(self._driver_type) ) ) # driver.get(url)一直不返回,但也不报错的问题,这时程序会卡住,设置超时选项能解决这个问题。 self.driver.set_page_load_timeout(self._timeout) # 设置10秒脚本超时时间 self.driver.set_script_timeout(self._timeout) self.url = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val: log.error(exc_val) self.quit() return True def filter_kwargs(self, kwargs: dict, driver_attrs: set): if not kwargs: return {} data = {} for key, value in kwargs.items(): if key in driver_attrs: data[key] = value return data def get_driver(self): return self.driver def firefox_driver(self): if webdriver.__version__ >= "4.0.0": raise Exception( f"暂未适配selenium=={webdriver.__version__}版本的firefox API,建议安装selenium==3.141.0版本或使用CHROME浏览器" ) firefox_profile = webdriver.FirefoxProfile() firefox_options = webdriver.FirefoxOptions() firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX try: from selenium.webdriver.firefox.service import Service except (ImportError, ModuleNotFoundError): Service = None if self._proxy: proxy = self._proxy() if callable(self._proxy) else self._proxy firefox_capabilities["marionette"] = True firefox_capabilities["proxy"] = { "proxyType": "MANUAL", "httpProxy": proxy, "ftpProxy": proxy, "sslProxy": proxy, } if self._user_agent: firefox_profile.set_preference( "general.useragent.override", self._user_agent() if callable(self._user_agent) else self._user_agent, ) if not self._load_images: firefox_profile.set_preference("permissions.default.image", 2) if self._headless: firefox_options.add_argument("--headless") firefox_options.add_argument("--disable-gpu") # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: firefox_options.add_argument(arg) kwargs = self.filter_kwargs(self._kwargs, self.__FIREFOX_ATTRS__) if Service is None: if self._executable_path: kwargs.update(executable_path=self._executable_path) elif self._auto_install_driver: kwargs.update(executable_path=GeckoDriverManager().install()) else: if self._executable_path: kwargs.update(service=Service(self._executable_path)) elif self._auto_install_driver: kwargs.update(service=Service(GeckoDriverManager().install())) driver = webdriver.Firefox( capabilities=firefox_capabilities, options=firefox_options, firefox_profile=firefox_profile, **kwargs, ) if self._window_size: driver.set_window_size(*self._window_size) return driver def chrome_driver(self): chrome_options = webdriver.ChromeOptions() # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option("useAutomationExtension", False) # docker 里运行需要 chrome_options.add_argument("--no-sandbox") try: from selenium.webdriver.chrome.service import Service except (ImportError, ModuleNotFoundError): Service = None if self._proxy: chrome_options.add_argument( "--proxy-server={}".format( self._proxy() if callable(self._proxy) else self._proxy ) ) if self._user_agent: chrome_options.add_argument( "user-agent={}".format( self._user_agent() if callable(self._user_agent) else self._user_agent ) ) if not self._load_images: chrome_options.add_experimental_option( "prefs", {"profile.managed_default_content_settings.images": 2} ) if self._headless: chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") if self._window_size: chrome_options.add_argument( "--window-size={},{}".format(self._window_size[0], self._window_size[1]) ) if self._download_path: os.makedirs(self._download_path, exist_ok=True) prefs = { "download.prompt_for_download": False, "download.default_directory": self._download_path, } chrome_options.add_experimental_option("prefs", prefs) # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: chrome_options.add_argument(arg) kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__) if Service is None: if self._executable_path: kwargs.update(executable_path=self._executable_path) elif self._auto_install_driver: kwargs.update(executable_path=ChromeDriverManager().install()) else: if self._executable_path: kwargs.update(service=Service(self._executable_path)) elif self._auto_install_driver: kwargs.update(service=Service(ChromeDriverManager().install())) driver = webdriver.Chrome(options=chrome_options, **kwargs) # 隐藏浏览器特征 if self._use_stealth_js: with open( os.path.join(os.path.dirname(__file__), "../js/stealth.min.js") ) as f: js = f.read() driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", {"source": js} ) if self._xhr_url_regexes: assert isinstance(self._xhr_url_regexes, list) with open( os.path.join(os.path.dirname(__file__), "../js/intercept.js") ) as f: js = f.read() driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", {"source": js} ) js = f"window.__urlRegexes = {self._xhr_url_regexes}" driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", {"source": js} ) if self._download_path: driver.command_executor._commands["send_command"] = ( "POST", "/session/$sessionId/chromium/send_command", ) params = { "cmd": "Page.setDownloadBehavior", "params": {"behavior": "allow", "downloadPath": self._download_path}, } driver.execute("send_command", params) return driver def edge_driver(self): edge_options = webdriver.EdgeOptions() # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium edge_options.add_experimental_option("excludeSwitches", ["enable-automation"]) edge_options.add_experimental_option("useAutomationExtension", False) # docker 里运行需要 edge_options.add_argument("--no-sandbox") try: from selenium.webdriver.edge.service import Service except (ImportError, ModuleNotFoundError): Service = None if self._proxy: edge_options.add_argument( "--proxy-server={}".format( self._proxy() if callable(self._proxy) else self._proxy ) ) if self._user_agent: edge_options.add_argument( "user-agent={}".format( self._user_agent() if callable(self._user_agent) else self._user_agent ) ) if not self._load_images: edge_options.add_experimental_option( "prefs", {"profile.managed_default_content_settings.images": 2} ) if self._headless: edge_options.add_argument("--headless") edge_options.add_argument("--disable-gpu") if self._window_size: edge_options.add_argument( "--window-size={},{}".format(self._window_size[0], self._window_size[1]) ) if self._download_path: os.makedirs(self._download_path, exist_ok=True) prefs = { "download.prompt_for_download": False, "download.default_directory": self._download_path, } edge_options.add_experimental_option("prefs", prefs) # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: edge_options.add_argument(arg) kwargs = self.filter_kwargs(self._kwargs, self.__CHROME_ATTRS__) if Service is None: if self._executable_path: kwargs.update(executable_path=self._executable_path) elif self._auto_install_driver: raise NotImplementedError("edge not support auto install driver") else: if self._executable_path: kwargs.update(service=Service(self._executable_path)) elif self._auto_install_driver: raise NotImplementedError("edge not support auto install driver") driver = webdriver.Edge(options=edge_options, **kwargs) # 隐藏浏览器特征 if self._use_stealth_js: with open( os.path.join(os.path.dirname(__file__), "../js/stealth.min.js") ) as f: js = f.read() driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", {"source": js} ) if self._xhr_url_regexes: assert isinstance(self._xhr_url_regexes, list) with open( os.path.join(os.path.dirname(__file__), "../js/intercept.js") ) as f: js = f.read() driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", {"source": js} ) js = f"window.__urlRegexes = {self._xhr_url_regexes}" driver.execute_cdp_cmd( "Page.addScriptToEvaluateOnNewDocument", {"source": js} ) if self._download_path: driver.command_executor._commands["send_command"] = ( "POST", "/session/$sessionId/chromium/send_command", ) params = { "cmd": "Page.setDownloadBehavior", "params": {"behavior": "allow", "downloadPath": self._download_path}, } driver.execute("send_command", params) return driver def phantomjs_driver(self): import warnings warnings.filterwarnings("ignore") service_args = [] dcap = DesiredCapabilities.PHANTOMJS if self._proxy: service_args.append( "--proxy=%s" % self._proxy() if callable(self._proxy) else self._proxy ) if self._user_agent: dcap["phantomjs.page.settings.userAgent"] = ( self._user_agent() if callable(self._user_agent) else self._user_agent ) if not self._load_images: service_args.append("--load-images=no") # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: service_args.append(arg) kwargs = self.filter_kwargs(self._kwargs, self.__PHANTOMJS_ATTRS__) if self._executable_path: kwargs.update(executable_path=self._executable_path) driver = webdriver.PhantomJS( service_args=service_args, desired_capabilities=dcap, **kwargs ) if self._window_size: driver.set_window_size(self._window_size[0], self._window_size[1]) del warnings return driver @property def domain(self): return tools.get_domain(self.url or self.driver.current_url) @property def cookies(self): cookies_json = {} for cookie in self.driver.get_cookies(): cookies_json[cookie["name"]] = cookie["value"] return cookies_json @cookies.setter def cookies(self, val: Union[dict, List[dict]]): """ 设置cookie Args: val: {"key":"value", "key2":"value2"} Returns: """ if isinstance(val, list): for cookie in val: # "path", "domain", "secure", "expiry" _cookie = { "name": cookie.get("name"), "value": cookie.get("value"), "domain": cookie.get("domain"), "path": cookie.get("path"), "expires": cookie.get("expires"), "secure": cookie.get("secure"), } self.driver.add_cookie(_cookie) else: for key, value in val.items(): self.driver.add_cookie({"name": key, "value": value}) @property def user_agent(self): return self.driver.execute_script("return navigator.userAgent;") def xhr_response(self, xhr_url_regex) -> Optional[InterceptResponse]: data = self.driver.execute_script( f'return window.__ajaxData["{xhr_url_regex}"];' ) if not data: return None request = InterceptRequest(**data["request"]) response = InterceptResponse(request, **data["response"]) return response def xhr_data(self, xhr_url_regex) -> Union[str, dict, None]: response = self.xhr_response(xhr_url_regex) if not response: return None return response.content def xhr_text(self, xhr_url_regex) -> Optional[str]: response = self.xhr_response(xhr_url_regex) if not response: return None if isinstance(response.content, dict): return json.dumps(response.content, ensure_ascii=False) return response.content def xhr_json(self, xhr_url_regex) -> Optional[dict]: text = self.xhr_text(xhr_url_regex) return json.loads(text) def __getattr__(self, name): if self.driver: return getattr(self.driver, name) else: raise AttributeError # def __del__(self): # self.quit()