# -*- coding: utf-8 -*- """ Created on 2018-06-19 17:17 --------- @summary: request 管理器, 负责缓冲添加到数据库中的request --------- @author: Boris @email: boris_liu@foxmail.com """ import collections import threading import feapder.setting as setting import feapder.utils.tools as tools from feapder.db.rabbitMq import RabbitMQ from feapder.dedup import Dedup from feapder.utils.log import log MAX_URL_COUNT = 1000 # 缓存中最大request数 class RequestBuffer(threading.Thread): dedup = None def __init__(self, redis_key, rabbitmq=None, user=None): if not hasattr(self, "_requests_deque"): super(RequestBuffer, self).__init__() self._thread_stop = False self._is_adding_to_db = False self._redis_key = redis_key self._user = user self._requests_deque = collections.deque() self._del_requests_deque = collections.deque() self._rabbitmq = rabbitmq or RabbitMQ() # 任务队列 self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key) self._rabbitmq.declare_bind(queue=self._tab_requests) # 失败任务队列 self._tab_failed_requests = setting.TAB_FAILED_REQUESTS self._rabbitmq.declare_bind(queue=self._tab_failed_requests) if not self.__class__.dedup and setting.REQUEST_FILTER_ENABLE: self.__class__.dedup = Dedup( name=redis_key, to_md5=False, **setting.REQUEST_FILTER_SETTING ) # 默认过期时间为一个月 def run(self): self._thread_stop = False while not self._thread_stop: try: self.__add_request_to_db() except Exception as e: log.exception(e) tools.delay_time(1) def stop(self): self._thread_stop = True self._started.clear() def get_failed_requests_count(self): return self._rabbitmq.get_message_count(self._tab_failed_requests) def put_request(self, request): self._requests_deque.append(request) if self.get_requests_count() > MAX_URL_COUNT: # 超过最大缓存,主动调用 self.flush() def put_failed_request(self, request, table=None): try: request_dict = request.to_dict if table is not None: self._rabbitmq.declare_bind(queue=table) # 声明额外的队列 queue = table or self._tab_failed_requests # 设置访问者的唯一标识 properties = dict(correlation_id=self._user or self._redis_key) self._rabbitmq.add(request_dict, queue=queue, properties=properties) except Exception as e: log.exception(e) def flush(self): try: self.__add_request_to_db() except Exception as e: log.exception(e) def get_requests_count(self): return len(self._requests_deque) def is_adding_to_db(self): return self._is_adding_to_db def __add_request_to_db(self): kw = {"properties": dict(correlation_id=self._user) if self._user else None} request_list = [] prioritys = [] callbacks = [] while self._requests_deque: request = self._requests_deque.popleft() self._is_adding_to_db = True if callable(request): # 函数 # 注意:应该考虑闭包情况。闭包情况可写成 # def test(xxx = xxx): # # TODO 业务逻辑 使用 xxx # 这么写不会导致xxx为循环结束后的最后一个值 callbacks.append(request) continue priority = request.priority # 如果需要去重并且库中已重复,则continue if ( request.filter_repeat and setting.REQUEST_FILTER_ENABLE and not self.__class__.dedup.add(request.fingerprint) ): log.debug("request已存在 url = %s" % request.url) continue else: request_list.append(str(request.to_dict)) prioritys.append(priority) # 入库(超过上限[MAX_URL_COUNT]执行) if len(request_list) > MAX_URL_COUNT: self._rabbitmq.add_batch(self._tab_requests, request_list, **kw) request_list = [] prioritys = [] # 入库(小于上限[MAX_URL_COUNT]执行) if request_list: self._rabbitmq.add_batch(self._tab_requests, request_list, **kw) # 执行回调 for callback in callbacks: try: callback() except Exception as e: log.exception(e) self._is_adding_to_db = False