123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-10-16
- ---------
- @summary: 采集状态管理器,负责缓冲添加到rabbitMq任务采集状态
- ---------
- @author: dzr
- """
- import collections
- import threading
- import feapder.setting as setting
- import feapder.utils.tools as tools
- from feapder.db.rabbitMq import RabbitMQ
- from feapder.dedup import Dedup
- from feapder.network.item import Item, BaseListItem
- from feapder.utils.log import log
- MAX_TASK_COUNT = 1000 # 缓存中最大待处理任务数
- MAX_FINISHED_COUNT = 1000 # 缓存中最大任务状态确认数
- class TaskBuffer(threading.Thread):
- dedup = None
- def __init__(self, redis_key, rabbitmq=None):
- if not hasattr(self, "_task_deque"):
- super(TaskBuffer, self).__init__()
- self._thread_stop = False
- self._is_adding_to_db = False
- self._task_deque = collections.deque()
- self._rabbitmq = rabbitmq or RabbitMQ()
- # 待处理详情任务队列
- self._tab_items = setting.TAB_ITEMS.format(
- redis_key=redis_key.replace('_detailc', '')
- )
- self._rabbitmq.declare(queue=self._tab_items)
- # 采集任务状态队列
- self._task_crawl_state = setting.TASK_CRAWL_STATE
- self._rabbitmq.declare(queue=self._task_crawl_state)
- if not self.__class__.dedup and setting.TASK_FILTER_ENABLE:
- self.__class__.dedup = Dedup(
- name=redis_key, to_md5=False, **setting.TASK_FILTER_SETTING
- ) # 默认过期时间为一个月
- def run(self):
- self._thread_stop = False
- while not self._thread_stop:
- try:
- self.__add_task_to_db()
- except Exception as e:
- log.exception(e)
- tools.delay_time(1)
- def stop(self):
- self._thread_stop = True
- self._started.clear()
- def put_task(self, task):
- if isinstance(task, Item):
- self._task_deque.append(task)
- if self.get_tasks_count() > MAX_TASK_COUNT: # 超过最大缓存,主动调用
- self.flush()
- def flush(self):
- try:
- self.__add_task_to_db()
- except Exception as e:
- log.exception(e)
- def get_tasks_count(self):
- return len(self._task_deque)
- def is_adding_to_db(self):
- return self._is_adding_to_db
- def __add_task_to_db(self):
- task_list = []
- finished_list = []
- while self._task_deque:
- task = self._task_deque.popleft()
- self._is_adding_to_db = True
- task_uuid = task.pyuuid # 任务唯一标识
- # 如果需要去重并且库中已重复,则continue
- if setting.TASK_FILTER_ENABLE and not self.__class__.dedup.add(task_uuid):
- log.debug("task已存在 pyuuid = %s" % task_uuid)
- continue
- else:
- if isinstance(task, BaseListItem):
- task_list.append(task) # 待采集任务
- else:
- task_dict = task.to_dict
- task_dict['state'] = 2 # 待采集任务成功采集状态[2=完成采集]
- finished_list.append(task_dict)
- # 入库(超过上限[MAX_URL_COUNT]执行)
- if len(task_list) > MAX_TASK_COUNT:
- self._rabbitmq.add(self._tab_items, task_list)
- task_list = []
- # 推送消息队列(超过上限[MAX_FINISHED_COUNT]执行)
- if len(finished_list) > MAX_FINISHED_COUNT:
- self._rabbitmq.add(self._task_crawl_state, finished_list)
- finished_list = []
- # 入库(小于上限[MAX_URL_COUNT]执行)
- if task_list:
- self._rabbitmq.add(self._tab_items, task_list)
- # 推送状态(超过上限[MAX_FINISHED_COUNT]执行)
- if finished_list:
- self._rabbitmq.add(self._task_crawl_state, finished_list)
- self._is_adding_to_db = False
|