123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-03-01
- ---------
- @summary: 远程selenium服务
- ---------
- @author: dzr
- @email: dongzhaorui@topnet.net.cn
- """
- import os
- import queue
- import threading
- from selenium import webdriver
- from selenium.webdriver.chrome.remote_connection import ChromeRemoteConnection
- from selenium.webdriver.firefox.remote_connection import FirefoxRemoteConnection
- from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
- from feapder.setting import WEBDRIVER
- from feapder.utils.log import log
- from feapder.utils.tools import Singleton
- DEFAULT_USERAGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36"
- class WebDriver(RemoteWebDriver):
- """浏览器采集 - selenium"""
- CHROME = "CHROME"
- FIREFOX = "FIREFOX"
- def __init__(
- self,
- load_images=True,
- user_agent=None,
- proxy=None,
- driver_type=FIREFOX,
- timeout=10,
- window_size=(1024, 800),
- server_addr=None,
- custom_argument=None,
- version=None,
- usages_local_driver=True,
- headless=False,
- executable_path=None,
- service_log_path=None,
- **kwargs
- ):
- """
- webdirver 封装,支持 chrome 和 firefox
- Args:
- load_images: 是否加载图片
- user_agent: 字符串 或 无参函数,返回值为user_agent
- proxy: xxx.xxx.xxx.xxx:xxxx 或 无参函数,返回值为代理地址
- headless: 是否启用无头模式
- driver_type: CHROME 或 FIREFOX...
- timeout: 请求超时时间
- window_size: # 窗口大小
- executable_path: 浏览器路径,默认为默认路径
- server_addr: 远程服务地址
- usages_local_driver: 使用本地驱动
- service_log_path: selenium service 日志路径
- version: 浏览器版本
- **kwargs:
- """
- self._load_images = load_images
- self._user_agent = user_agent or DEFAULT_USERAGENT
- self._proxy = proxy
- self._headless = headless
- self._timeout = timeout
- self._window_size = window_size
- self._server_addr = server_addr or WEBDRIVER["server_addr"]
- self._custom_argument = custom_argument
- self._version = version or WEBDRIVER["version"]
- self._executable_path = executable_path
- self._usages_local_driver = usages_local_driver
- self._service_log_path = service_log_path
- if driver_type == WebDriver.CHROME:
- self.driver = self.chrome_driver()
- elif driver_type == WebDriver.FIREFOX:
- self.driver = self.firefox_driver()
- else:
- raise TypeError(
- "dirver_type must be one of CHROME or PHANTOMJS or FIREFOX, but received {}".format(
- type(driver_type)
- )
- )
- # driver.get(url)一直不返回,但也不报错的问题,这时程序会卡住,设置超时选项能解决这个问题。
- self.driver.set_page_load_timeout(self._timeout)
- # 设置10秒脚本超时时间
- self.driver.set_script_timeout(self._timeout)
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- if exc_val:
- log.error(exc_val)
- self.get_driver().quit()
- return False
- def get_driver(self):
- return self.driver
- def local_firefox_driver(self):
- firefox_profile = webdriver.FirefoxProfile()
- firefox_options = webdriver.FirefoxOptions()
- firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
- firefox_profile.set_preference("dom.webdriver.enabled", False)
- if self._proxy:
- proxy = self._proxy() if callable(self._proxy) else self._proxy
- proxy = proxy.replace("socks5://", "")
- # 使用socks5 代理
- firefox_profile.set_preference('network.proxy.type', 1) # 不使用代理:0, 使用代理:1
- firefox_profile.set_preference('network.proxy.socks', proxy.split(":")[0])
- firefox_profile.set_preference('network.proxy.socks_port', int(proxy.split(":")[-1]))
- if self._user_agent:
- firefox_profile.set_preference(
- "general.useragent.override",
- self._user_agent() if callable(
- self._user_agent) else self._user_agent,
- )
- if not self._load_images:
- firefox_profile.set_preference("permissions.default.image", 2)
- if self._headless:
- firefox_options.add_argument("--headless")
- firefox_options.add_argument("--disable-gpu")
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- firefox_options.add_argument(arg)
- if self._executable_path:
- driver = webdriver.Firefox(
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- executable_path=self._executable_path,
- service_log_path=self._service_log_path
- )
- else:
- driver = webdriver.Firefox(
- capabilities=firefox_capabilities,
- options=firefox_options,
- firefox_profile=firefox_profile,
- service_log_path=self._service_log_path
- )
- if self._window_size:
- driver.set_window_size(*self._window_size)
- return driver
- def remote_firefox_driver(self):
- firefox_capabilities = {
- "browserName": "firefox",
- "platform": "ANY",
- "version": self._version,
- "javascriptEnabled": True,
- "marionette": False,
- }
- firefox_options = webdriver.FirefoxOptions()
- firefox_options.add_argument("--disable-gpu")
- firefox_options.set_preference("dom.webdriver.enabled", False)
- if self._proxy:
- proxy = self._proxy() if callable(self._proxy) else self._proxy
- proxy = proxy.replace("socks5://", "")
- # 使用socks5 代理
- ip, port = proxy.split(":")
- firefox_options.set_preference('network.proxy.type', 1) # 不使用代理:0, 使用代理:1
- firefox_options.set_preference('network.proxy.socks', ip)
- firefox_options.set_preference('network.proxy.socks_port', int(port))
- # firefox_capabilities["marionette"] = True # http代理的使用
- if self._user_agent:
- firefox_options.set_preference(
- "general.useragent.override",
- self._user_agent() if callable(self._user_agent) else self._user_agent,
- )
- if not self._load_images:
- firefox_options.set_preference("permissions.default.image", 2)
- if self._custom_argument:
- for arg in self._custom_argument:
- firefox_options.add_argument(arg)
- executor = FirefoxRemoteConnection(remote_server_addr=self._server_addr)
- browser = webdriver.Remote(
- command_executor=executor,
- desired_capabilities=firefox_capabilities,
- options=firefox_options
- )
- if self._window_size:
- browser.set_window_size(*self._window_size)
- return browser
- def firefox_driver(self):
- if self._usages_local_driver:
- return self.local_firefox_driver()
- return self.remote_firefox_driver()
- def remote_chrome_driver(self):
- chrome_capabilities = {
- "browserName": "chrome",
- "platform": "ANY",
- "version": self._version,
- "javascriptEnabled": True,
- }
- chrome_options = webdriver.ChromeOptions()
- # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
- chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
- chrome_options.add_experimental_option("useAutomationExtension", False)
- # docker 里运行需要
- chrome_options.add_argument("--no-sandbox")
- chrome_options.add_argument("--disable-gpu")
- if self._user_agent:
- chrome_options.add_argument(
- "user-agent={}".format(
- self._user_agent()
- if callable(self._user_agent)
- else self._user_agent
- )
- )
- # 不支持socks5协议
- # if self._proxy:
- # chrome_options.add_argument(
- # "--proxy-server={}".format(
- # self._proxy() if callable(self._proxy) else self._proxy
- # )
- # )
- if not self._load_images:
- chrome_options.add_experimental_option(
- "prefs", {"profile.managed_default_content_settings.images": 2}
- )
- if self._window_size:
- chrome_options.add_argument(
- "--window-size={},{}".format(self._window_size[0], self._window_size[1])
- )
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- chrome_options.add_argument(arg)
- browser = webdriver.Remote(
- command_executor=ChromeRemoteConnection(
- remote_server_addr=self._server_addr,
- keep_alive=True),
- desired_capabilities=chrome_capabilities,
- options=chrome_options
- )
- # 隐藏浏览器特征
- with open(os.path.join(os.path.dirname(__file__), "./js/stealth.min.js")) as f:
- js = f.read()
- params = {
- 'cmd': 'Page.addScriptToEvaluateOnNewDocument',
- 'params': {'source': js}
- }
- res = browser.execute("executeCdpCommand", params)['value']
- return browser
- def local_chrome_driver(self):
- chrome_options = webdriver.ChromeOptions()
- # 此步骤很重要,设置为开发者模式,防止被各大网站识别出来使用了Selenium
- chrome_options.add_experimental_option("excludeSwitches", ["enable-automation"])
- chrome_options.add_experimental_option("useAutomationExtension", False)
- # docker 里运行需要
- chrome_options.add_argument("--no-sandbox")
- if self._proxy:
- chrome_options.add_argument(
- "--proxy-server={}".format(
- self._proxy() if callable(self._proxy) else self._proxy
- )
- )
- if self._user_agent:
- chrome_options.add_argument(
- "user-agent={}".format(
- self._user_agent()
- if callable(self._user_agent)
- else self._user_agent
- )
- )
- if not self._load_images:
- chrome_options.add_experimental_option(
- "prefs", {"profile.managed_default_content_settings.images": 2}
- )
- if self._headless:
- chrome_options.add_argument("--headless")
- chrome_options.add_argument("--disable-gpu")
- if self._window_size:
- chrome_options.add_argument(
- "--window-size={},{}".format(self._window_size[0], self._window_size[1])
- )
- # 添加自定义的配置参数
- if self._custom_argument:
- for arg in self._custom_argument:
- chrome_options.add_argument(arg)
- if self._executable_path:
- driver = webdriver.Chrome(
- chrome_options=chrome_options,
- executable_path=self._executable_path,
- service_log_path=self._service_log_path
- )
- else:
- driver = webdriver.Chrome(
- chrome_options=chrome_options,
- service_log_path=self._service_log_path
- )
- # 隐藏浏览器特征
- with open(os.path.join(os.path.dirname(__file__), "./js/stealth.min.js")) as f:
- js = f.read()
- driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {"source": js})
- return driver
- def chrome_driver(self):
- if self._usages_local_driver:
- return self.local_chrome_driver()
- return self.remote_chrome_driver()
- @property
- def cookies(self):
- cookies_json = {}
- for cookie in self.driver.get_cookies():
- cookies_json[cookie["name"]] = cookie["value"]
- return cookies_json
- @cookies.setter
- def cookies(self, val: dict):
- """
- 设置cookie
- Args:
- val: {"key":"value", "key2":"value2"}
- Returns:
- """
- for key, value in val.items():
- self.driver.add_cookie({"name": key, "value": value})
- def __getattr__(self, name):
- if self.driver:
- return getattr(self.driver, name)
- else:
- raise AttributeError
- # def __del__(self):
- # if self.driver:
- # self.driver.quit()
- @Singleton
- class WebDriverPool:
- def __init__(self, pool_size=5, **kwargs):
- self.queue = queue.Queue(maxsize=pool_size)
- self.kwargs = kwargs
- self.lock = threading.RLock()
- self.driver_count = 0
- @property
- def is_full(self):
- return self.driver_count >= self.queue.maxsize
- def get(self, user_agent: str = None, proxy: str = None) -> WebDriver:
- """
- 获取webdriver
- 当webdriver为新实例时会使用 user_agen, proxy, cookie参数来创建
- Args:
- user_agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.103 Safari/537.36
- proxy: xxx.xxx.xxx.xxx
- Returns:
- """
- if not self.is_full:
- with self.lock:
- if not self.is_full:
- kwargs = self.kwargs.copy()
- if user_agent:
- kwargs["user_agent"] = user_agent
- if proxy:
- kwargs["proxy"] = proxy
- driver = WebDriver(**kwargs)
- self.queue.put(driver)
- self.driver_count += 1
- driver = self.queue.get()
- return driver
- def put(self, driver):
- self.queue.put(driver)
- def remove(self, driver):
- driver.quit()
- self.driver_count -= 1
- def close(self):
- while not self.queue.empty():
- driver = self.queue.get()
- driver.quit()
- self.driver_count -= 1
|