# -*- coding: utf-8 -*- """ Created on 2023-10-16 --------- @summary: 任务状态管理器,负责缓冲添加到rabbitMq中的数据消息 --------- @author: dzr """ import collections import threading import feapder.setting as setting import feapder.utils.tools as tools from feapder.db.rabbitMq import RabbitMQ from feapder.dedup import Dedup from feapder.network.item import Item, BaseListItem from feapder.utils.log import log MAX_TASK_COUNT = 1000 # 缓存中最大待处理任务数 MAX_STATE_COUNT = 1000 # 缓存中最大待处理任务确认状态数 class TaskBuffer(threading.Thread): dedup = None def __init__(self, redis_key, rabbitmq=None): if not hasattr(self, "_task_deque"): super(TaskBuffer, self).__init__() self._thread_stop = False self._is_adding_to_db = False self._task_deque = collections.deque() self._rabbitmq = rabbitmq or RabbitMQ() # 待处理详情任务队列 self._tab_items = setting.TAB_ITEMS.format( redis_key=redis_key.replace('_detailc', '') ) self._rabbitmq.declare(queue=self._tab_items) # 待处理任务状态队列 self._task_state = setting.TASK_PROCESS_STATE self._rabbitmq.declare(queue=self._task_state) if not self.__class__.dedup and setting.TASK_FILTER_ENABLE: self.__class__.dedup = Dedup( name=redis_key, to_md5=False, **setting.TASK_FILTER_SETTING ) # 默认过期时间为一个月 def run(self): self._thread_stop = False while not self._thread_stop: try: self.__add_task_to_db() except Exception as e: log.exception(e) tools.delay_time(1) def stop(self): self._thread_stop = True self._started.clear() def put_task(self, task): if isinstance(task, Item): self._task_deque.append(task) if self.get_tasks_count() > MAX_TASK_COUNT: # 超过最大缓存,主动调用 self.flush() def flush(self): try: self.__add_task_to_db() except Exception as e: log.exception(e) def get_tasks_count(self): return len(self._task_deque) def is_adding_to_db(self): return self._is_adding_to_db def __add_task_to_db(self): task_list = [] task_state_list = [] while self._task_deque: task = self._task_deque.popleft() self._is_adding_to_db = True task_uuid = task.pyuuid # 任务唯一标识 # 如果需要去重并且库中已重复,则continue if setting.TASK_FILTER_ENABLE and not self.__class__.dedup.add(task_uuid): log.debug("task已存在 pyuuid = %s" % task_uuid) continue else: update_at = tools.ensure_int64(tools.get_current_timestamp()) if isinstance(task, BaseListItem): task_list.append(task) # 待采集任务 else: if not hasattr(task, 'failed_times'): task_state_list.append({ 'pyuuid': task_uuid, 'update_at': update_at, 'state': 2 # 待采集任务成功采集状态[2=完成采集] }) else: task_state_list.append({ 'pyuuid': task_uuid, 'update_at': update_at, 'state': 3 # 待采集任务失败采集状态[3=采集失败] }) # 入库(超过上限[MAX_URL_COUNT]执行) if len(task_list) > MAX_TASK_COUNT: self._rabbitmq.add(self._tab_items, task_list) task_list = [] # 推送消息队列(超过上限[MAX_STATE_COUNT]执行) if len(task_state_list) > MAX_STATE_COUNT: self._rabbitmq.add(self._task_state, task_state_list) task_state_list = [] # 入库(小于上限[MAX_URL_COUNT]执行) if task_list: self._rabbitmq.add(self._tab_items, task_list) # 推送消息队列(超过上限[MAX_STATE_COUNT]执行) if task_state_list: self._rabbitmq.add(self._task_state, task_state_list) self._is_adding_to_db = False