|
@@ -1,124 +0,0 @@
|
|
-# -*- coding: utf-8 -*-
|
|
|
|
-"""
|
|
|
|
-Created on 2023-10-16
|
|
|
|
----------
|
|
|
|
-@summary: 采集任务与状态管理器,负责缓冲管理采集的任务与采集完成的状态更新
|
|
|
|
----------
|
|
|
|
-@author: dzr
|
|
|
|
-"""
|
|
|
|
-
|
|
|
|
-import collections
|
|
|
|
-import threading
|
|
|
|
-
|
|
|
|
-import feapder.setting as setting
|
|
|
|
-import feapder.utils.tools as tools
|
|
|
|
-from feapder.db.rabbitMq import RabbitMQ
|
|
|
|
-from feapder.dedup import Dedup
|
|
|
|
-from feapder.network.item import Item, BaseListItem
|
|
|
|
-from feapder.utils.log import log
|
|
|
|
-
|
|
|
|
-MAX_TASK_COUNT = 1000 # 缓存中最大待处理任务数
|
|
|
|
-MAX_FINISHED_COUNT = 1000 # 缓存中最大任务状态确认数
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-class TaskBuffer(threading.Thread):
|
|
|
|
- dedup = None
|
|
|
|
-
|
|
|
|
- def __init__(self, redis_key, rabbitmq=None):
|
|
|
|
- if not hasattr(self, "_task_deque"):
|
|
|
|
- super(TaskBuffer, self).__init__()
|
|
|
|
-
|
|
|
|
- self._thread_stop = False
|
|
|
|
- self._is_adding_to_db = False
|
|
|
|
-
|
|
|
|
- self._task_deque = collections.deque()
|
|
|
|
-
|
|
|
|
- self._rabbitmq = rabbitmq or RabbitMQ()
|
|
|
|
-
|
|
|
|
- # 待处理详情任务队列
|
|
|
|
- self._tab_items = setting.TAB_ITEMS.format(
|
|
|
|
- redis_key=redis_key.replace('_detailc', '')
|
|
|
|
- )
|
|
|
|
- self._rabbitmq.declare(queue=self._tab_items)
|
|
|
|
- # 采集任务状态队列
|
|
|
|
- self._task_crawl_state = setting.TASK_CRAWL_STATE
|
|
|
|
- self._rabbitmq.declare(queue=self._task_crawl_state)
|
|
|
|
-
|
|
|
|
- if not self.__class__.dedup and setting.TASK_FILTER_ENABLE:
|
|
|
|
- self.__class__.dedup = Dedup(
|
|
|
|
- name=redis_key, to_md5=False, **setting.TASK_FILTER_SETTING
|
|
|
|
- ) # 默认过期时间为一个月
|
|
|
|
-
|
|
|
|
- def run(self):
|
|
|
|
- self._thread_stop = False
|
|
|
|
- while not self._thread_stop:
|
|
|
|
- try:
|
|
|
|
- self.__add_task_to_db()
|
|
|
|
- except Exception as e:
|
|
|
|
- log.exception(e)
|
|
|
|
-
|
|
|
|
- tools.delay_time(1)
|
|
|
|
-
|
|
|
|
- def stop(self):
|
|
|
|
- self._thread_stop = True
|
|
|
|
- self._started.clear()
|
|
|
|
-
|
|
|
|
- def put_task(self, task):
|
|
|
|
- if isinstance(task, Item):
|
|
|
|
- self._task_deque.append(task)
|
|
|
|
- if self.get_tasks_count() > MAX_TASK_COUNT: # 超过最大缓存,主动调用
|
|
|
|
- self.flush()
|
|
|
|
-
|
|
|
|
- def flush(self):
|
|
|
|
- try:
|
|
|
|
- self.__add_task_to_db()
|
|
|
|
- except Exception as e:
|
|
|
|
- log.exception(e)
|
|
|
|
-
|
|
|
|
- def get_tasks_count(self):
|
|
|
|
- return len(self._task_deque)
|
|
|
|
-
|
|
|
|
- def is_adding_to_db(self):
|
|
|
|
- return self._is_adding_to_db
|
|
|
|
-
|
|
|
|
- def __add_task_to_db(self):
|
|
|
|
- task_list = []
|
|
|
|
- finished_list = []
|
|
|
|
-
|
|
|
|
- while self._task_deque:
|
|
|
|
- task = self._task_deque.popleft()
|
|
|
|
- self._is_adding_to_db = True
|
|
|
|
-
|
|
|
|
- task_uuid = task.pyuuid # 任务唯一标识
|
|
|
|
-
|
|
|
|
- # 如果需要去重并且库中已重复,则continue
|
|
|
|
- if setting.TASK_FILTER_ENABLE and not self.__class__.dedup.add(task_uuid):
|
|
|
|
- log.debug("task已存在 pyuuid = %s" % task_uuid)
|
|
|
|
- continue
|
|
|
|
- else:
|
|
|
|
- if isinstance(task, BaseListItem):
|
|
|
|
- task_list.append(task) # 待采集任务
|
|
|
|
- else:
|
|
|
|
- task_dict = task.to_dict
|
|
|
|
- task_dict['state'] = 2 # 待采集任务成功采集状态[2=完成采集]
|
|
|
|
- finished_list.append(task_dict)
|
|
|
|
-
|
|
|
|
- # 入库(超过上限[MAX_URL_COUNT]执行)
|
|
|
|
- if len(task_list) > MAX_TASK_COUNT:
|
|
|
|
- self._rabbitmq.add(self._tab_items, task_list)
|
|
|
|
- task_list = []
|
|
|
|
-
|
|
|
|
- # 推送消息队列(超过上限[MAX_FINISHED_COUNT]执行)
|
|
|
|
- if len(finished_list) > MAX_FINISHED_COUNT:
|
|
|
|
- self._rabbitmq.add(self._task_crawl_state, finished_list)
|
|
|
|
- finished_list = []
|
|
|
|
-
|
|
|
|
- # 入库(小于上限[MAX_URL_COUNT]执行)
|
|
|
|
- if task_list:
|
|
|
|
- self._rabbitmq.add(self._tab_items, task_list)
|
|
|
|
-
|
|
|
|
- # 推送状态(超过上限[MAX_FINISHED_COUNT]执行)
|
|
|
|
- if finished_list:
|
|
|
|
- self._rabbitmq.add(self._task_crawl_state, finished_list)
|
|
|
|
-
|
|
|
|
- self._is_adding_to_db = False
|
|
|