# -*- coding: utf-8 -*- """ Created on 2023-10-16 --------- @summary: 采集状态管理器,负责缓冲添加到rabbitMq任务采集状态 --------- @author: dzr """ import collections import threading import feapder.setting as setting import feapder.utils.tools as tools from feapder.db.rabbitMq import RabbitMQ from feapder.dedup import Dedup from feapder.network.item import Item, BaseListItem from feapder.utils.log import log MAX_TASK_COUNT = 1000 # 缓存中最大待处理任务数 MAX_FINISHED_COUNT = 1000 # 缓存中最大任务状态确认数 class TaskBuffer(threading.Thread): dedup = None def __init__(self, redis_key, rabbitmq=None): if not hasattr(self, "_task_deque"): super(TaskBuffer, self).__init__() self._thread_stop = False self._is_adding_to_db = False self._task_deque = collections.deque() self._rabbitmq = rabbitmq or RabbitMQ() # 待处理详情任务队列 self._tab_items = setting.TAB_ITEMS.format( redis_key=redis_key.replace('_detailc', '') ) self._rabbitmq.declare(queue=self._tab_items) # 采集任务状态队列 self._task_crawl_state = setting.TASK_CRAWL_STATE self._rabbitmq.declare(queue=self._task_crawl_state) if not self.__class__.dedup and setting.TASK_FILTER_ENABLE: self.__class__.dedup = Dedup( name=redis_key, to_md5=False, **setting.TASK_FILTER_SETTING ) # 默认过期时间为一个月 def run(self): self._thread_stop = False while not self._thread_stop: try: self.__add_task_to_db() except Exception as e: log.exception(e) tools.delay_time(1) def stop(self): self._thread_stop = True self._started.clear() def put_task(self, task): if isinstance(task, Item): self._task_deque.append(task) if self.get_tasks_count() > MAX_TASK_COUNT: # 超过最大缓存,主动调用 self.flush() def flush(self): try: self.__add_task_to_db() except Exception as e: log.exception(e) def get_tasks_count(self): return len(self._task_deque) def is_adding_to_db(self): return self._is_adding_to_db def __add_task_to_db(self): task_list = [] finished_list = [] while self._task_deque: task = self._task_deque.popleft() self._is_adding_to_db = True task_uuid = task.pyuuid # 任务唯一标识 # 如果需要去重并且库中已重复,则continue if setting.TASK_FILTER_ENABLE and not self.__class__.dedup.add(task_uuid): log.debug("task已存在 pyuuid = %s" % task_uuid) continue else: if isinstance(task, BaseListItem): task_list.append(task) # 待采集任务 else: task_dict = task.to_dict task_dict['state'] = 2 # 待采集任务成功采集状态[2=完成采集] finished_list.append(task_dict) # 入库(超过上限[MAX_URL_COUNT]执行) if len(task_list) > MAX_TASK_COUNT: self._rabbitmq.add(self._tab_items, task_list) task_list = [] # 推送消息队列(超过上限[MAX_FINISHED_COUNT]执行) if len(finished_list) > MAX_FINISHED_COUNT: self._rabbitmq.add(self._task_crawl_state, finished_list) finished_list = [] # 入库(小于上限[MAX_URL_COUNT]执行) if task_list: self._rabbitmq.add(self._tab_items, task_list) # 推送状态(超过上限[MAX_FINISHED_COUNT]执行) if finished_list: self._rabbitmq.add(self._task_crawl_state, finished_list) self._is_adding_to_db = False