dongzhaorui 3 years ago
parent
commit
6bf23fefd3
1 changed files with 62 additions and 248 deletions
  1. 62 248
      zgzb/common/webdriver.py

+ 62 - 248
zgzb/common/webdriver.py

@@ -1,6 +1,6 @@
-import json
+import datetime
 from collections import namedtuple
-from typing import Optional
+from pathlib import Path
 
 from selenium import webdriver
 from selenium.common.exceptions import WebDriverException
@@ -11,43 +11,12 @@ from selenium.webdriver.support.ui import WebDriverWait
 
 from common.log import logger
 
-DEFAULT_USERAGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.0; rv:77.0) Gecko/20100101 Firefox/77.0"
-Netloc = namedtuple('Netloc', ['host', 'port'])
-
-
-def until_wait(
-        driver,
-        *,
-        xpath=None,
-        classname=None,
-        text=None,
-        timeout=None
-):
-    """
-    显示等待页面加载,否则抛出TimeoutException
-
-    :param driver: 浏览器驱动
-    :param xpath: xpath规则,页面等待特征
-    :param classname: class属性名称,页面等待特征
-    :param text: 期待的文本
-    :param timeout: 超时时间
-    :return:
-    """
-    _timeout = (timeout or 60)
-    wait = WebDriverWait(driver, _timeout, 0.2)
-    if xpath is not None:
-        locator = (By.XPATH, xpath)
-        if text is not None:
-            wait.until(EC.text_to_be_present_in_element(locator, text))
-        else:
-            wait.until(EC.presence_of_element_located(locator))
+_absolute = Path(__file__).absolute().parent.parent
+_date = datetime.datetime.now().strftime('%Y-%m-%d')
+_service_log_path = (_absolute / f'logs/geckodriver-{_date}.log').resolve()
 
-    elif classname is not None:
-        locator = (By.CLASS_NAME, classname)
-        if text is not None:
-            wait.until(EC.text_to_be_present_in_element(locator, text))
-        else:
-            wait.until(EC.presence_of_element_located(locator))
+DEFAULT_USERAGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:96.0) Gecko/20100101 Firefox/96.0"
+Netloc = namedtuple('Netloc', ['host', 'port'])
 
 
 def check_navigator(driver):
@@ -60,215 +29,8 @@ def netloc(proxies: dict) -> Netloc:
     return Netloc(host, port)
 
 
-class XhrRequest:
-    def __init__(self, url, data, headers):
-        self.url = url
-        self.data = data
-        self.headers = headers
-
-
-class XhrResponse:
-    def __init__(self, request: XhrRequest, url, headers, content, status_code):
-        self.request = request
-        self.url = url
-        self.headers = headers
-        self.content = content
-        self.status_code = status_code
-
-
-class FireFoxWebDriver:
-
-    def __init__(
-            self,
-            user_agent=None,
-            proxy=None,
-            headless=True,
-            timeout=60,
-            load_images=False,
-            executable_path=None,
-            window_size: tuple = None,
-            xhr_url_regexes: list = None,
-    ):
-        """
-        支持 firefox
-        Args:
-            user_agent: 字符串 或 无参函数,返回值为user_agent
-            proxy: {'https://sockets:xxx.xxx.xxx.xxx:xxxx'} 或 无参函数,返回值为代理地址
-            headless: 是否启用无头模式, 默认:无头模式
-            timeout: 请求超时时间
-            load_images: 是否加载图片
-            executable_path: 浏览器路径,默认为默认路径
-            window_size: # 窗口大小
-            xhr_url_regexes: 拦截xhr接口,支持正则,数组类型
-        """
-        self._user_agent = user_agent or DEFAULT_USERAGENT
-        self._proxy = proxy
-        self._load_images = load_images
-        self._headless = headless
-        self._timeout = timeout
-        self._xhr_url_regexes = xhr_url_regexes
-        self._window_size = window_size
-        self._executable_path = executable_path
-
-        firefox_profile = webdriver.FirefoxProfile()
-        firefox_options = webdriver.FirefoxOptions()
-        firefox_capabilities = webdriver.DesiredCapabilities.FIREFOX
-        if self._proxy:
-            proxy = self._proxy() if callable(self._proxy) else self._proxy
-            host, port = netloc(proxy)
-            # 不使用代理=0, 使用代理=1
-            firefox_profile.set_preference('network.proxy.type', 1)
-            firefox_profile.set_preference('network.proxy.socks', host)
-            # 端口必须使用int类型,才会生效
-            firefox_profile.set_preference('network.proxy.socks_port', int(port))
-            firefox_profile.update_preferences()
-
-        if self._user_agent:
-            firefox_profile.set_preference(
-                "general.useragent.override",
-                self._user_agent() if callable(self._user_agent) else self._user_agent,
-            )
-            firefox_profile.update_preferences()
-
-        if not self._load_images:
-            '''
-            允许加载所有图像,无论来源如何(默认)=1
-            阻止所有图像加载=2
-            防止加载第三方图像=3
-            '''
-            firefox_profile.set_preference("permissions.default.image", 2)
-            firefox_profile.update_preferences()
-
-        if self._headless:
-            firefox_options.add_argument("--headless")
-            firefox_options.add_argument("--disable-gpu")
-
-        if self._executable_path:
-            _driver = webdriver.Firefox(
-                capabilities=firefox_capabilities,
-                options=firefox_options,
-                firefox_profile=firefox_profile,
-                executable_path=self._executable_path,
-            )
-        else:
-            _driver = webdriver.Firefox(
-                capabilities=firefox_capabilities,
-                options=firefox_options,
-                firefox_profile=firefox_profile,
-            )
-
-        if self._window_size:
-            _driver.set_window_size(*self._window_size)
-
-        self.driver = _driver
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        if exc_val:
-            logger.error(f'{self.__class__.__name__} >>> {exc_type} <> {exc_val}')
-
-        self.driver.quit()
-        return True
-
-    def set_page_load_timeout(self, timeout=None):
-        """
-        设置selenium页面执行时间
-        :param timeout: 超时时间,默认:60s
-        :return:
-        """
-        _timeout = (timeout or self._timeout)
-        # driver.get(url)一直不返回,但也不报错的问题,这时程序会卡住,设置超时选项能解决这个问题。
-        self.driver.set_page_load_timeout(_timeout)
-        # 设置脚本超时时间
-        self.driver.set_script_timeout(_timeout)
-
-    def quit(self):
-        self.driver.quit()
-
-    def xhr_response(self, xhr_url_regex) -> Optional[XhrResponse]:
-        data = self.driver.execute_script(
-            f'return window.__ajaxData["{xhr_url_regex}"];'
-        )
-        if not data:
-            return None
-
-        request = XhrRequest(**data["request"])
-        response = XhrResponse(request, **data["response"])
-        return response
-
-    def xhr_text(self, xhr_url_regex) -> Optional[str]:
-        response = self.xhr_response(xhr_url_regex)
-        if not response:
-            return None
-        return response.content
-
-    def xhr_json(self, xhr_url_regex) -> Optional[dict]:
-        text = self.xhr_text(xhr_url_regex)
-        return json.loads(text)
-
-    def get(self, url):
-        self.driver.get(url)
-
-    @property
-    def user_agent(self):
-        return self.driver.execute_script("return navigator.userAgent;")
-
-    @property
-    def page_title(self):
-        return self.driver.execute_script('return document.title')
-
-    @property
-    def page_source(self):
-        return self.driver.page_source
-
-    def find_element_by_xpath(self, xpath: str):
-        """
-        通过xpath寻找元素,不存在该元素时,抛出 NoSuchElementException
-        :param xpath: 需要寻找的元素的xpath
-        :return:
-        """
-        return self.driver.find_element_by_xpath(xpath)
-
-    def until_wait(
-            self,
-            *,
-            xpath=None,
-            classname=None,
-            text=None,
-            timeout=None
-    ):
-        """
-        显示等待页面加载,否则抛出TimeoutException
-
-        :param xpath: xpath规则,页面等待特征
-        :param classname: class属性名称,页面等待特征
-        :param text: 期待的文本
-        :param timeout: 超时时间
-        :return:
-        """
-        _timeout = (timeout or self._timeout)
-        wait = WebDriverWait(self.driver, _timeout, 0.2)
-        if xpath is not None:
-            locator = (By.XPATH, xpath)
-            if text is not None:
-                wait.until(EC.text_to_be_present_in_element(locator, text))
-            else:
-                wait.until(EC.presence_of_element_located(locator))
-
-        elif classname is not None:
-            locator = (By.CLASS_NAME, classname)
-            if text is not None:
-                wait.until(EC.text_to_be_present_in_element(locator, text))
-            else:
-                wait.until(EC.presence_of_element_located(locator))
-
-    def switch_to_window(self):
-        self.driver.execute_script('window.open();')
-        handles = self.driver.window_handles
-        self.driver.close()
-        self.driver.switch_to.window(handles[-1])
+class FireFoxWebDriverError(WebDriverException):
+    pass
 
 
 class WebDriver(RemoteWebDriver):
@@ -326,7 +88,7 @@ class WebDriver(RemoteWebDriver):
             logger.error(f'{self.__class__.__name__} <> {exc_type.__name__}: {exc_val}')
 
         self.quit()
-        return True
+        return False
 
     def firefox_driver(self):
         firefox_profile = webdriver.FirefoxProfile()
@@ -370,6 +132,7 @@ class WebDriver(RemoteWebDriver):
 
         if self._executable_path:
             driver = webdriver.Firefox(
+                service_log_path=str(_service_log_path),
                 capabilities=firefox_capabilities,
                 options=firefox_options,
                 firefox_profile=firefox_profile,
@@ -377,6 +140,7 @@ class WebDriver(RemoteWebDriver):
             )
         else:
             driver = webdriver.Firefox(
+                service_log_path=str(_service_log_path),
                 capabilities=firefox_capabilities,
                 options=firefox_options,
                 firefox_profile=firefox_profile,
@@ -412,3 +176,53 @@ class WebDriver(RemoteWebDriver):
             return getattr(self.driver, name)
         else:
             raise AttributeError
+
+
+def get_user_agent(driver):
+    return driver.execute_script("return navigator.userAgent;")
+
+
+def get_title(driver):
+    return driver.execute_script('return document.title')
+
+
+def until_wait(
+        driver,
+        *,
+        xpath=None,
+        classname=None,
+        text=None,
+        timeout=None
+):
+    """
+    显示等待页面加载,否则抛出TimeoutException
+
+    :param driver: 浏览器驱动
+    :param xpath: xpath规则,页面等待特征
+    :param classname: class属性名称,页面等待特征
+    :param text: 期待的文本
+    :param timeout: 超时时间
+    :return:
+    """
+    _timeout = (timeout or 60)
+    wait = WebDriverWait(driver, _timeout, 0.2)
+    if xpath is not None:
+        locator = (By.XPATH, xpath)
+        if text is not None:
+            wait.until(EC.text_to_be_present_in_element(locator, text))
+        else:
+            wait.until(EC.presence_of_element_located(locator))
+
+    elif classname is not None:
+        locator = (By.CLASS_NAME, classname)
+        if text is not None:
+            wait.until(EC.text_to_be_present_in_element(locator, text))
+        else:
+            wait.until(EC.presence_of_element_located(locator))
+
+
+def new_window(driver):
+    """新的窗口"""
+    driver.execute_script('window.open();')
+    handles = driver.window_handles
+    driver.switch_to.window(handles[-1])