Browse Source

添加任务状态管理器

dongzhaorui 1 year ago
parent
commit
0b7e502258
1 changed files with 134 additions and 0 deletions
  1. 134 0
      FworkSpider/feapder/buffer/task_buffer.py

+ 134 - 0
FworkSpider/feapder/buffer/task_buffer.py

@@ -0,0 +1,134 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2018-06-19 17:17
+---------
+@summary: 任务状态管理器,负责缓冲添加到rabbitMq中的数据消息
+---------
+@author: dzr
+"""
+
+import collections
+import threading
+
+import feapder.setting as setting
+import feapder.utils.tools as tools
+from feapder.db.rabbitMq import RabbitMQ
+from feapder.dedup import Dedup
+from feapder.network.item import Item, BaseListItem
+from feapder.utils.log import log
+
+MAX_TASK_COUNT = 1000  # 缓存中最大待处理任务数
+MAX_STATE_COUNT = 1000  # 缓存中最大待处理任务确认状态数
+
+
+class TaskBuffer(threading.Thread):
+    dedup = None
+
+    def __init__(self, redis_key, rabbitmq=None):
+        if not hasattr(self, "_task_deque"):
+            super(TaskBuffer, self).__init__()
+
+            self._thread_stop = False
+            self._is_adding_to_db = False
+
+            self._task_deque = collections.deque()
+
+            self._rabbitmq = rabbitmq or RabbitMQ()
+
+            # 待处理详情任务队列
+            self._tab_items = setting.TAB_ITEMS.format(
+                redis_key=redis_key.replace('_detailc', '')
+            )
+            self._rabbitmq.declare(queue=self._tab_items)
+            # 待处理任务状态队列
+            self._task_state = setting.TASK_PROCESS_STATE
+            self._rabbitmq.declare(queue=self._task_state)
+
+            if not self.__class__.dedup and setting.TASK_FILTER_ENABLE:
+                self.__class__.dedup = Dedup(
+                    name=redis_key, to_md5=False, **setting.TASK_FILTER_SETTING
+                )  # 默认过期时间为一个月
+
+    def run(self):
+        self._thread_stop = False
+        while not self._thread_stop:
+            try:
+                self.__add_task_to_db()
+            except Exception as e:
+                log.exception(e)
+
+            tools.delay_time(1)
+
+    def stop(self):
+        self._thread_stop = True
+        self._started.clear()
+
+    def put_task(self, task):
+        if isinstance(task, Item):
+            self._task_deque.append(task)
+            if self.get_tasks_count() > MAX_TASK_COUNT:  # 超过最大缓存,主动调用
+                self.flush()
+
+    def flush(self):
+        try:
+            self.__add_task_to_db()
+        except Exception as e:
+            log.exception(e)
+
+    def get_tasks_count(self):
+        return len(self._task_deque)
+
+    def is_adding_to_db(self):
+        return self._is_adding_to_db
+
+    def __add_task_to_db(self):
+        task_list = []
+        task_state_list = []
+
+        while self._task_deque:
+            task = self._task_deque.popleft()
+            self._is_adding_to_db = True
+
+            task_uuid = task.pyuuid  # 任务唯一标识
+
+            # 如果需要去重并且库中已重复,则continue
+            if setting.TASK_FILTER_ENABLE and not self.__class__.dedup.add(task_uuid):
+                log.debug("task已存在 pyuuid = %s" % task_uuid)
+                continue
+            else:
+                update_at = tools.ensure_int64(tools.get_current_timestamp())
+                if isinstance(task, BaseListItem):
+                    task_list.append(task)  # 待采集任务
+                else:
+                    if not hasattr(task, 'failed_times'):
+                        task_state_list.append({
+                            'pyuuid': task_uuid,
+                            'update_at': update_at,
+                            'state': 2  # 待采集任务成功采集状态[2=完成采集]
+                        })
+                    else:
+                        task_state_list.append({
+                            'pyuuid': task_uuid,
+                            'update_at': update_at,
+                            'state': 3  # 待采集任务失败采集状态[3=采集失败]
+                        })
+
+            # 入库(超过上限[MAX_URL_COUNT]执行)
+            if len(task_list) > MAX_TASK_COUNT:
+                self._rabbitmq.add(self._tab_items, task_list)
+                task_list = []
+
+            # 推送消息队列(超过上限[MAX_STATE_COUNT]执行)
+            if len(task_state_list) > MAX_STATE_COUNT:
+                self._rabbitmq.add(self._task_state, task_state_list)
+                task_state_list = []
+
+        # 入库(小于上限[MAX_URL_COUNT]执行)
+        if task_list:
+            self._rabbitmq.add(self._tab_items, task_list)
+
+        # 推送消息队列(超过上限[MAX_STATE_COUNT]执行)
+        if task_state_list:
+            self._rabbitmq.add(self._task_state, task_state_list)
+
+        self._is_adding_to_db = False