# -*- coding: utf-8 -*- """ Created on 2023-03-01 --------- @summary: 远程selenium服务 --------- @author: dzr @email: dongzhaorui@topnet.net.cn """ import os import queue import threading from selenium import webdriver from selenium.webdriver.chrome.remote_connection import ChromeRemoteConnection from selenium.webdriver.firefox.remote_connection import FirefoxRemoteConnection from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver from feapder.setting import WEBDRIVER from feapder.utils.log import log from feapder.utils.tools import Singleton DEFAULT_USERAGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36" class WebDriver(RemoteWebDriver): """浏览器采集 - selenium""" CHROME = "CHROME" FIREFOX = "FIREFOX" def __init__( self, load_images=True, user_agent=None, proxy=None, driver_type=FIREFOX, timeout=10, window_size=(1024, 800), server_addr=None, custom_argument=None, version=None, usages_local_driver=True, headless=False, executable_path=None, service_log_path=None, **kwargs ): """ webdirver 封装,支持 chrome 和 firefox Args: load_images: 是否加载图片 user_agent: 字符串 或 无参函数,返回值为user_agent proxy: xxx.xxx.xxx.xxx:xxxx 或 无参函数,返回值为代理地址 headless: 是否启用无头模式 driver_type: CHROME 或 FIREFOX... timeout: 请求超时时间 window_size: # 窗口大小 executable_path: 浏览器路径,默认为默认路径 server_addr: 远程服务地址 usages_local_driver: 使用本地驱动 service_log_path: selenium service 日志路径 version: 浏览器版本 **kwargs: """ self._load_images = load_images self._user_agent = user_agent or DEFAULT_USERAGENT self._proxy = proxy self._headless = headless self._timeout = timeout self._window_size = window_size self._server_addr = server_addr or WEBDRIVER["server_addr"] self._custom_argument = custom_argument self._version = version or WEBDRIVER["version"] self._executable_path = executable_path self._usages_local_driver = usages_local_driver self._service_log_path = service_log_path if driver_type == WebDriver.CHROME: self.driver = self.chrome_driver() elif driver_type == WebDriver.FIREFOX: self.driver = self.firefox_driver() else: raise TypeError( "dirver_type must be one of CHROME or PHANTOMJS or FIREFOX, but received {}".format( type(driver_type) ) ) # driver.get(url)一直不返回,但也不报错的问题,这时程序会卡住,设置超时选项能解决这个问题。 self.driver.set_page_load_timeout(self._timeout) # 设置10秒脚本超时时间 self.driver.set_script_timeout(self._timeout) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if exc_val: log.error(exc_val) self.get_driver().quit() return False def get_driver(self): return self.driver def local_firefox_driver(self): firefox_profile = webdriver.FirefoxProfile() firefox_options = webdriver.FirefoxOptions() firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX firefox_profile.set_preference("dom.webdriver.enabled", False) if self._proxy: proxy = self._proxy() if callable(self._proxy) else self._proxy proxy = proxy.replace("socks5://", "") # 使用socks5 代理 firefox_profile.set_preference('network.proxy.type', 1) # 不使用代理:0, 使用代理:1 firefox_profile.set_preference('network.proxy.socks', proxy.split(":")[0]) firefox_profile.set_preference('network.proxy.socks_port', int(proxy.split(":")[-1])) if self._user_agent: firefox_profile.set_preference( "general.useragent.override", self._user_agent() if callable( self._user_agent) else self._user_agent, ) if not self._load_images: firefox_profile.set_preference("permissions.default.image", 2) if self._headless: firefox_options.add_argument("--headless") firefox_options.add_argument("--disable-gpu") # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: firefox_options.add_argument(arg) if self._executable_path: driver = webdriver.Firefox( capabilities=firefox_capabilities, options=firefox_options, firefox_profile=firefox_profile, executable_path=self._executable_path, service_log_path=self._service_log_path ) else: driver = webdriver.Firefox( capabilities=firefox_capabilities, options=firefox_options, firefox_profile=firefox_profile, service_log_path=self._service_log_path ) if self._window_size: driver.set_window_size(*self._window_size) return driver def remote_firefox_driver(self): firefox_capabilities = { "browserName": "firefox", "platform": "ANY", "version": self._version, "javascriptEnabled": True, "marionette": False, } firefox_options = webdriver.FirefoxOptions() firefox_options.add_argument("--disable-gpu") firefox_options.set_preference("dom.webdriver.enabled", False) if self._proxy: proxy = self._proxy() if callable(self._proxy) else self._proxy proxy = proxy.replace("socks5://", "") # 使用socks5 代理 ip, port = proxy.split(":") firefox_options.set_preference('network.proxy.type', 1) # 不使用代理:0, 使用代理:1 firefox_options.set_preference('network.proxy.socks', ip) firefox_options.set_preference('network.proxy.socks_port', int(port)) # firefox_capabilities["marionette"] = True # http代理的使用 if self._user_agent: firefox_options.set_preference( "general.useragent.override", self._user_agent() if callable(self._user_agent) else self._user_agent, ) if not self._load_images: firefox_options.set_preference("permissions.default.image", 2) if self._custom_argument: for arg in self._custom_argument: firefox_options.add_argument(arg) executor = FirefoxRemoteConnection(remote_server_addr=self._server_addr) browser = webdriver.Remote( command_executor=executor, desired_capabilities=firefox_capabilities, options=firefox_options ) if self._window_size: browser.set_window_size(*self._window_size) return browser def firefox_driver(self): if self._usages_local_driver: return self.local_firefox_driver() return self.remote_firefox_driver() def remote_chrome_driver(self): chrome_capabilities = { "browserName": "chrome", "platform": "ANY", "version": self._version, "javascriptEnabled": True, } chrome_options = webdriver.ChromeOptions() # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option("useAutomationExtension", False) # docker 里运行需要 chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-gpu") if self._user_agent: chrome_options.add_argument( "user-agent={}".format( self._user_agent() if callable(self._user_agent) else self._user_agent ) ) # 不支持socks5协议 # if self._proxy: # chrome_options.add_argument( # "--proxy-server={}".format( # self._proxy() if callable(self._proxy) else self._proxy # ) # ) if not self._load_images: chrome_options.add_experimental_option( "prefs", {"profile.managed_default_content_settings.images": 2} ) if self._window_size: chrome_options.add_argument( "--window-size={},{}".format(self._window_size[0], self._window_size[1]) ) # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: chrome_options.add_argument(arg) browser = webdriver.Remote( command_executor=ChromeRemoteConnection( remote_server_addr=self._server_addr, keep_alive=True), desired_capabilities=chrome_capabilities, options=chrome_options ) # 隐藏浏览器特征 with open(os.path.join(os.path.dirname(__file__), "./js/stealth.min.js")) as f: js = f.read() params = { 'cmd': 'Page.addScriptToEvaluateOnNewDocument', 'params': {'source': js} } res = browser.execute("executeCdpCommand", params)['value'] return browser def local_chrome_driver(self): chrome_options = webdriver.ChromeOptions() # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"]) chrome_options.add_experimental_option("useAutomationExtension", False) # docker 里运行需要 chrome_options.add_argument("--no-sandbox") if self._proxy: chrome_options.add_argument( "--proxy-server={}".format( self._proxy() if callable(self._proxy) else self._proxy ) ) if self._user_agent: chrome_options.add_argument( "user-agent={}".format( self._user_agent() if callable(self._user_agent) else self._user_agent ) ) if not self._load_images: chrome_options.add_experimental_option( "prefs", {"profile.managed_default_content_settings.images": 2} ) if self._headless: chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") if self._window_size: chrome_options.add_argument( "--window-size={},{}".format(self._window_size[0], self._window_size[1]) ) # 添加自定义的配置参数 if self._custom_argument: for arg in self._custom_argument: chrome_options.add_argument(arg) if self._executable_path: driver = webdriver.Chrome( chrome_options=chrome_options, executable_path=self._executable_path, service_log_path=self._service_log_path ) else: driver = webdriver.Chrome( chrome_options=chrome_options, service_log_path=self._service_log_path ) # 隐藏浏览器特征 with open(os.path.join(os.path.dirname(__file__), "./js/stealth.min.js")) as f: js = f.read() driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": js}) return driver def chrome_driver(self): if self._usages_local_driver: return self.local_chrome_driver() return self.remote_chrome_driver() @property def cookies(self): cookies_json = {} for cookie in self.driver.get_cookies(): cookies_json[cookie["name"]] = cookie["value"] return cookies_json @cookies.setter def cookies(self, val: dict): """ 设置cookie Args: val: {"key":"value", "key2":"value2"} Returns: """ for key, value in val.items(): self.driver.add_cookie({"name": key, "value": value}) def __getattr__(self, name): if self.driver: return getattr(self.driver, name) else: raise AttributeError # def __del__(self): # if self.driver: # self.driver.quit() @Singleton class WebDriverPool: def __init__(self, pool_size=5, **kwargs): self.queue = queue.Queue(maxsize=pool_size) self.kwargs = kwargs self.lock = threading.RLock() self.driver_count = 0 @property def is_full(self): return self.driver_count >= self.queue.maxsize def get(self, user_agent: str = None, proxy: str = None) -> WebDriver: """ 获取webdriver 当webdriver为新实例时会使用 user_agen, proxy, cookie参数来创建 Args: user_agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36 proxy: xxx.xxx.xxx.xxx Returns: """ if not self.is_full: with self.lock: if not self.is_full: kwargs = self.kwargs.copy() if user_agent: kwargs["user_agent"] = user_agent if proxy: kwargs["proxy"] = proxy driver = WebDriver(**kwargs) self.queue.put(driver) self.driver_count += 1 driver = self.queue.get() return driver def put(self, driver): self.queue.put(driver) def remove(self, driver): driver.quit() self.driver_count -= 1 def close(self): while not self.queue.empty(): driver = self.queue.get() driver.quit() self.driver_count -= 1