123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513 |
- # -*- coding: utf-8 -*-
- """
- Created on 2018-07-25 11:49:08
- ---------
- @summary: 请求结构体
- ---------
- @author: Boris
- @email: boris_liu@foxmail.com
- """
- import requests
- from func_timeout import func_set_timeout, FunctionTimedOut
- from requests.adapters import HTTPAdapter
- from requests.cookies import RequestsCookieJar
- from requests.packages.urllib3.exceptions import InsecureRequestWarning
- import feapder.setting as setting
- import feapder.utils.tools as tools
- from feapder.db.redisdb import RedisDB
- from feapder.network import user_agent
- from feapder.network.proxy_pool import ProxyPool
- from feapder.network.response import Response
- from feapder.utils.log import Log
- from feapder.utils.webdriver import WebDriverPool
- log = Log()
- # 屏蔽warning信息
- requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
- class Request(object):
- session = None
- webdriver_pool: WebDriverPool = None
- user_agent_pool = user_agent
- proxies_pool: ProxyPool = None
- cache_db = None # redis / pika
- cached_redis_key = None # 缓存response的文件文件夹 response_cached:cached_redis_key:md5
- cached_expire_time = 1200 # 缓存过期时间
- local_filepath = None
- oss_handler = None
- __REQUEST_ATTRS__ = {
- # 'method', 'url', 必须传递 不加入**kwargs中
- "params",
- "data",
- "headers",
- "cookies",
- "files",
- "auth",
- "timeout",
- "allow_redirects",
- "proxies",
- "hooks",
- "stream",
- "verify",
- "cert",
- "json",
- }
- DEFAULT_KEY_VALUE = dict(
- url="",
- retry_times=0,
- priority=300,
- parser_name=None,
- callback=None,
- filter_repeat=True,
- auto_request=True,
- request_sync=False,
- use_session=None,
- random_user_agent=True,
- download_midware=None,
- is_abandoned=False,
- render=False,
- render_time=0,
- )
- def __init__(
- self,
- url="",
- retry_times=0,
- priority=300,
- parser_name=None,
- callback=None,
- filter_repeat=True,
- auto_request=True,
- request_sync=False,
- use_session=None,
- random_user_agent=True,
- download_midware=None,
- is_abandoned=False,
- render=False,
- render_time=0,
- **kwargs,
- ):
- """
- @summary: Request参数
- ---------
- 框架参数
- @param url: 待抓取url
- @param retry_times: 当前重试次数
- @param priority: 优先级 越小越优先 默认300
- @param parser_name: 回调函数所在的类名 默认为当前类
- @param callback: 回调函数 可以是函数 也可是函数名(如想跨类回调时,parser_name指定那个类名,callback指定那个类想回调的方法名即可)
- @param filter_repeat: 是否需要去重 (True/False) 当setting中的REQUEST_FILTER_ENABLE设置为True时该参数生效 默认True
- @param auto_request: 是否需要自动请求下载网页 默认是。设置为False时返回的response为空,需要自己去请求网页
- @param request_sync: 是否同步请求下载网页,默认异步。如果该请求url过期时间快,可设置为True,相当于yield的reqeust会立即响应,而不是去排队
- @param use_session: 是否使用session方式
- @param random_user_agent: 是否随机User-Agent (True/False) 当setting中的RANDOM_HEADERS设置为True时该参数生效 默认True
- @param download_midware: 下载中间件。默认为parser中的download_midware
- @param is_abandoned: 当发生异常时是否放弃重试 True/False. 默认False
- @param render: 是否用浏览器渲染
- @param render_time: 渲染时长,即打开网页等待指定时间后再获取源码
- --
- 以下参数与requests参数使用方式一致
- @param method: 请求方式,如POST或GET,默认根据data值是否为空来判断
- @param params: 请求参数
- @param data: 请求body
- @param json: 请求json字符串,同 json.dumps(data)
- @param headers:
- @param cookies: 字典 或 CookieJar 对象
- @param files:
- @param auth:
- @param timeout: (浮点或元组)等待服务器数据的超时限制,是一个浮点数,或是一个(connect timeout, read timeout) 元组
- @param allow_redirects : Boolean. True 表示允许跟踪 POST/PUT/DELETE 方法的重定向
- @param proxies: 代理 {"http":"http://xxx", "https":"https://xxx"}
- @param verify: 为 True 时将会验证 SSL 证书
- @param stream: 如果为 False,将会立即下载响应内容
- @param cert:
- --
- @param **kwargs: 其他值: 如 Request(item=item) 则item可直接用 request.item 取出
- ---------
- @result:
- """
- self.url = url
- self.retry_times = retry_times
- self.priority = priority
- self.parser_name = parser_name
- self.callback = callback
- self.filter_repeat = filter_repeat
- self.auto_request = auto_request
- self.request_sync = request_sync
- self.use_session = use_session
- self.random_user_agent = random_user_agent
- self.download_midware = download_midware
- self.is_abandoned = is_abandoned
- self.render = render
- self.render_time = render_time or setting.WEBDRIVER.get("render_time", 0)
- self.requests_kwargs = {}
- for key, value in kwargs.items():
- if key in self.__class__.__REQUEST_ATTRS__: # 取requests参数
- self.requests_kwargs[key] = value
- self.__dict__[key] = value
- def __repr__(self):
- try:
- return "<Request {}>".format(self.url)
- except:
- return "<Request {}>".format(str(self.to_dict)[:40])
- def __setattr__(self, key, value):
- """
- 针对 request.xxx = xxx 的形式,更新reqeust及内部参数值
- @param key:
- @param value:
- @return:
- """
- self.__dict__[key] = value
- if key in self.__class__.__REQUEST_ATTRS__:
- self.requests_kwargs[key] = value
- def __lt__(self, other):
- return self.priority < other.priority
- @property
- def _session(self):
- use_session = (
- setting.USE_SESSION if self.use_session is None else self.use_session
- ) # self.use_session 优先级高
- if use_session and not self.__class__.session:
- self.__class__.session = requests.Session()
- # pool_connections – 缓存的 urllib3 连接池个数 pool_maxsize – 连接池中保存的最大连接数
- http_adapter = HTTPAdapter(pool_connections=1000, pool_maxsize=1000)
- # 任何使用该session会话的 HTTP 请求,只要其 URL 是以给定的前缀开头,该传输适配器就会被使用到。
- self.__class__.session.mount("http", http_adapter)
- return self.__class__.session
- @property
- def _webdriver_pool(self):
- if not self.__class__.webdriver_pool:
- self.__class__.webdriver_pool = WebDriverPool(**setting.WEBDRIVER)
- return self.__class__.webdriver_pool
- @property
- def _proxies_pool(self):
- if not self.__class__.proxies_pool:
- self.__class__.proxies_pool = ProxyPool()
- return self.__class__.proxies_pool
- @property
- def to_dict(self):
- request_dict = {}
- self.callback = (
- getattr(self.callback, "__name__")
- if callable(self.callback)
- else self.callback
- )
- self.download_midware = (
- getattr(self.download_midware, "__name__")
- if callable(self.download_midware)
- else self.download_midware
- )
- for key, value in self.__dict__.items():
- if (
- key in self.__class__.DEFAULT_KEY_VALUE
- and self.__class__.DEFAULT_KEY_VALUE.get(key) == value
- or key == "requests_kwargs"
- ):
- continue
- if key in self.__class__.__REQUEST_ATTRS__:
- if not isinstance(
- value, (bytes, bool, float, int, str, tuple, list, dict)
- ):
- value = tools.dumps_obj(value)
- else:
- if not isinstance(value, (bytes, bool, float, int, str)):
- value = tools.dumps_obj(value)
- request_dict[key] = value
- return request_dict
- @property
- def callback_name(self):
- return (
- getattr(self.callback, "__name__")
- if callable(self.callback)
- else self.callback
- )
- @func_set_timeout(30)
- def get_response(self, save_cached=False):
- """
- 获取带有selector功能的response
- @param save_cached: 保存缓存 方便调试时不用每次都重新下载
- @return:
- """
- # 设置超时默认时间
- self.requests_kwargs.setdefault(
- "timeout", setting.REQUEST_TIMEOUT
- ) # connect=22 read=22
- # 设置stream
- # 默认情况下,当你进行网络请求后,响应体会立即被下载。你可以通过 stream 参数覆盖这个行为,推迟下载响应体直到访问 Response.content 属性。此时仅有响应头被下载下来了。缺点: stream 设为 True,Requests 无法将连接释放回连接池,除非你 消耗了所有的数据,或者调用了 Response.close。 这样会带来连接效率低下的问题。
- self.requests_kwargs.setdefault("stream", True)
- # 关闭证书验证
- self.requests_kwargs.setdefault("verify", False)
- # 设置请求方法
- method = self.__dict__.get("method")
- if not method:
- if "data" in self.requests_kwargs:
- method = "POST"
- else:
- method = "GET"
- # 随机user—agent
- headers = self.requests_kwargs.get("headers", {})
- if "user-agent" not in headers and "User-Agent" not in headers:
- if self.render: # 如果是渲染默认,优先使用WEBDRIVER中配置的ua
- ua = setting.WEBDRIVER.get(
- "user_agent"
- ) or self.__class__.user_agent_pool.get(setting.USER_AGENT_TYPE)
- else:
- ua = self.__class__.user_agent_pool.get(setting.USER_AGENT_TYPE)
- if self.random_user_agent and setting.RANDOM_HEADERS:
- headers.update({"User-Agent": ua})
- self.requests_kwargs.update(headers=headers)
- else:
- self.requests_kwargs.setdefault(
- "headers", {"User-Agent": setting.DEFAULT_USERAGENT}
- )
- # 代理
- proxies = self.requests_kwargs.get("proxies", -1)
- if proxies == -1 and setting.PROXY_ENABLE and setting.PROXY_EXTRACT_API:
- while True:
- proxies = self._proxies_pool.get()
- if proxies:
- self.requests_kwargs.update(proxies=proxies)
- break
- else:
- log.debug("暂无可用代理 ...")
- log.debug(
- """
- -------------- %srequest for ----------------
- url = %s
- method = %s
- body = %s
- """
- % (
- ""
- if not self.parser_name
- else "%s.%s "
- % (
- self.parser_name,
- (
- self.callback
- and callable(self.callback)
- and getattr(self.callback, "__name__")
- or self.callback
- )
- or "parse",
- ),
- self.url,
- method,
- self.requests_kwargs,
- )
- )
- # def hooks(response, *args, **kwargs):
- # print(response.url)
- #
- # self.requests_kwargs.update(hooks={'response': hooks})
- use_session = (
- setting.USE_SESSION if self.use_session is None else self.use_session
- ) # self.use_session 优先级高
- if self.render:
- # 使用request的user_agent、cookies、proxy
- user_agent = headers.get("User-Agent") or headers.get("user-agent")
- cookies = self.requests_kwargs.get("cookies")
- print(cookies)
- if cookies and isinstance(cookies, RequestsCookieJar):
- cookies = cookies.get_dict()
- if not cookies:
- cookie_str = headers.get("Cookie") or headers.get("cookie")
- if cookie_str:
- cookies = tools.get_cookies_from_str(cookie_str)
- proxy = None
- if proxies and proxies != -1:
- proxy = proxies.get("http", "").strip("http://") or proxies.get(
- "https", ""
- ).strip("https://")
- browser = self._webdriver_pool.get(user_agent=user_agent, proxy=proxy)
- try:
- browser.get(self.url)
- if cookies:
- browser.cookies = cookies
- if self.render_time:
- tools.delay_time(self.render_time)
- html = browser.page_source
- response = Response.from_dict(
- {
- "url": browser.current_url,
- "cookies": browser.cookies,
- "_content": html.encode(),
- "status_code": 200,
- "elapsed": 666,
- "headers": {
- "User-Agent": browser.execute_script(
- "return navigator.userAgent"
- ),
- "Cookie": tools.cookies2str(browser.cookies),
- },
- }
- )
- response.browser = browser
- except Exception as e:
- self._webdriver_pool.remove(browser)
- raise e
- elif use_session:
- response = self._session.request(method, self.url, **self.requests_kwargs)
- response = Response(response)
- else:
- response = requests.request(method, self.url, **self.requests_kwargs)
- response = Response(response)
- if save_cached:
- self.save_cached(response, expire_time=self.__class__.cached_expire_time)
- log.info("requests",extra={"url":response.url,"code":response.status_code})
- return response
- def proxies(self):
- """
- Returns: {"https": "https://ip:port", "http": "http://ip:port"}
- """
- return self.requests_kwargs.get("proxies")
- def proxy(self):
- """
- Returns: ip:port
- """
- proxies = self.proxies()
- if proxies:
- return proxies.get("http", "").strip("http://") or proxies.get(
- "https", ""
- ).strip("https://")
- def user_agent(self):
- headers = self.requests_kwargs.get("headers")
- if headers:
- return headers.get("user_agent") or headers.get("User-Agent")
- @property
- def fingerprint(self):
- """
- request唯一表识
- @return:
- """
- url = self.__dict__.get("url", "")
- # url 归一化
- url = tools.canonicalize_url(url)
- args = [url]
- for arg in ["params", "data", "files", "auth", "cert", "json"]:
- if self.requests_kwargs.get(arg):
- args.append(self.requests_kwargs.get(arg))
- return tools.get_md5(*args)
- @property
- def _cache_db(self):
- if not self.__class__.cache_db:
- self.__class__.cache_db = RedisDB() # .from_url(setting.pika_spider_1_uri)
- return self.__class__.cache_db
- @property
- def _cached_redis_key(self):
- if self.__class__.cached_redis_key:
- return (
- f"response_cached:{self.__class__.cached_redis_key}:{self.fingerprint}"
- )
- else:
- return f"response_cached:test:{self.fingerprint}"
- def save_cached(self, response, expire_time=1200):
- """
- 使用redis保存response 用于调试 不用每回都下载
- @param response:
- @param expire_time: 过期时间
- @return:
- """
- self._cache_db.strset(self._cached_redis_key, response.to_dict, ex=expire_time)
- def get_response_from_cached(self, save_cached=True):
- """
- 从缓存中获取response
- 注意:
- 属性值为空:
- -raw : urllib3.response.HTTPResponse
- -connection:requests.adapters.HTTPAdapter
- -history
- 属性含义改变:
- - request 由requests 改为Request
- @param: save_cached 当无缓存 直接下载 下载完是否保存缓存
- @return:
- """
- response_dict = self._cache_db.strget(self._cached_redis_key)
- if not response_dict:
- log.info("无response缓存 重新下载")
- try:
- response_obj = self.get_response(save_cached=save_cached)
- except FunctionTimedOut:
- log.info("请求超时")
- log.info("requests", extra={"url": self.url, "code": 0})
- else:
- response_dict = eval(response_dict)
- response_obj = Response.from_dict(response_dict)
- return response_obj
- def del_response_cached(self):
- self._cache_db.clear(self._cached_redis_key)
- @classmethod
- def from_dict(cls, request_dict):
- for key, value in request_dict.items():
- if isinstance(value, bytes): # 反序列化 如item
- request_dict[key] = tools.loads_obj(value)
- return cls(**request_dict)
- def copy(self):
- return self.__class__.from_dict(self.to_dict)
|