123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- # -*- coding: utf-8 -*-
- """
- Created on 2018-06-19 17:17
- ---------
- @summary: request 管理器, 负责缓冲添加到数据库中的request
- ---------
- @author: Boris
- @email: boris_liu@foxmail.com
- """
- import collections
- import threading
- import feapder.setting as setting
- import feapder.utils.tools as tools
- from feapder.db.redisdb import RedisDB
- from feapder.dedup import Dedup
- from feapder.utils.log import log
- MAX_URL_COUNT = 1000 # 缓存中最大request数
- class RequestBuffer(threading.Thread):
- dedup = None
- def __init__(self, redis_key):
- if not hasattr(self, "_requests_deque"):
- super(RequestBuffer, self).__init__()
- self._thread_stop = False
- self._is_adding_to_db = False
- self._requests_deque = collections.deque()
- self._del_requests_deque = collections.deque()
- self._db = RedisDB()
- self._table_request = setting.TAB_REQUSETS.format(redis_key=redis_key)
- self._table_failed_request = setting.TAB_FAILED_REQUSETS.format(
- redis_key=redis_key
- )
- if not self.__class__.dedup and setting.REQUEST_FILTER_ENABLE:
- self.__class__.dedup = Dedup(
- name=redis_key, to_md5=False, **setting.REQUEST_FILTER_SETTING
- ) # 默认过期时间为一个月
- def run(self): # step 1 线程入口
- self._thread_stop = False
- while not self._thread_stop: # 每隔一分钟进行一次 将产生的任务存储
- try:
- self.__add_request_to_db()
- except Exception as e:
- log.exception(e)
- tools.delay_time(1)
- def stop(self):
- self._thread_stop = True
- self._started.clear()
- def put_request(self, request):
- self._requests_deque.append(request)
- if self.get_requests_count() > MAX_URL_COUNT: # 超过最大缓存,主动调用
- self.flush()
- def put_del_request(self, request):
- self._del_requests_deque.append(request)
- def put_failed_request(self, request, table=None):
- try:
- request_dict = request.to_dict
- self._db.zadd(
- table or self._table_failed_request, request_dict, request.priority
- )
- except Exception as e:
- log.exception(e)
- def flush(self):
- try:
- self.__add_request_to_db()
- except Exception as e:
- log.exception(e)
- def get_requests_count(self):
- return len(self._requests_deque)
- def is_adding_to_db(self):
- return self._is_adding_to_db
- def __add_request_to_db(self):
- request_list = []
- prioritys = []
- callbacks = []
- while self._requests_deque:
- request = self._requests_deque.popleft() # 从任务队列中从左取任务(先进先出)
- self._is_adding_to_db = True
- if callable(request):
- # 函数
- # 注意:应该考虑闭包情况。闭包情况可写成
- # def test(xxx = xxx):
- # # TODO 业务逻辑 使用 xxx
- # 这么写不会导致xxx为循环结束后的最后一个值
- callbacks.append(request)
- continue
- priority = request.priority
- # 如果需要去重并且库中已重复 则continue
- if (
- request.filter_repeat
- and setting.REQUEST_FILTER_ENABLE
- and not self.__class__.dedup.add(request.fingerprint)
- ):
- log.debug("request已存在 url = %s" % request.url)
- continue
- else:
- request_list.append(str(request.to_dict))
- prioritys.append(priority)
- if len(request_list) > MAX_URL_COUNT:
- self._db.zadd(self._table_request, request_list, prioritys)
- request_list = []
- prioritys = []
- # 入库
- if request_list:
- self._db.zadd(self._table_request, request_list, prioritys)
- # 执行回调
- for callback in callbacks:
- try:
- callback()
- except Exception as e:
- log.exception(e)
- # 删除已做任务
- if self._del_requests_deque:
- request_done_list = []
- while self._del_requests_deque:
- request_done_list.append(self._del_requests_deque.popleft())
- # 去掉request_list中的requests, 否则可能会将刚添加的request删除
- request_done_list = list(set(request_done_list) - set(request_list))
- if request_done_list:
- self._db.zrem(self._table_request, request_done_list)
- self._is_adding_to_db = False
|