Răsfoiți Sursa

删除采集失败状态

dongzhaorui 1 an în urmă
părinte
comite
58f29e479f
1 a modificat fișierele cu 15 adăugiri și 25 ștergeri
  1. 15 25
      FworkSpider/feapder/buffer/task_buffer.py

+ 15 - 25
FworkSpider/feapder/buffer/task_buffer.py

@@ -18,7 +18,7 @@ from feapder.network.item import Item, BaseListItem
 from feapder.utils.log import log
 
 MAX_TASK_COUNT = 1000  # 缓存中最大待处理任务数
-MAX_STATE_COUNT = 1000  # 缓存中最大待处理任务确认状态
+MAX_FINISHED_COUNT = 1000   # 缓存中最大任务状态确认
 
 
 class TaskBuffer(threading.Thread):
@@ -40,9 +40,9 @@ class TaskBuffer(threading.Thread):
                 redis_key=redis_key.replace('_detailc', '')
             )
             self._rabbitmq.declare(queue=self._tab_items)
-            # 待处理任务状态队列
-            self._task_state = setting.TASK_PROCESS_STATE
-            self._rabbitmq.declare(queue=self._task_state)
+            # 采集任务状态队列
+            self._task_crawl_state = setting.TASK_CRAWL_STATE
+            self._rabbitmq.declare(queue=self._task_crawl_state)
 
             if not self.__class__.dedup and setting.TASK_FILTER_ENABLE:
                 self.__class__.dedup = Dedup(
@@ -83,7 +83,7 @@ class TaskBuffer(threading.Thread):
 
     def __add_task_to_db(self):
         task_list = []
-        task_state_list = []
+        finished_list = []
 
         while self._task_deque:
             task = self._task_deque.popleft()
@@ -96,39 +96,29 @@ class TaskBuffer(threading.Thread):
                 log.debug("task已存在 pyuuid = %s" % task_uuid)
                 continue
             else:
-                update_at = tools.ensure_int64(tools.get_current_timestamp())
                 if isinstance(task, BaseListItem):
                     task_list.append(task)  # 待采集任务
                 else:
-                    if not hasattr(task, 'failed_times'):
-                        task_state_list.append({
-                            'pyuuid': task_uuid,
-                            'update_at': update_at,
-                            'state': 2  # 待采集任务成功采集状态[2=完成采集]
-                        })
-                    else:
-                        task_state_list.append({
-                            'pyuuid': task_uuid,
-                            'update_at': update_at,
-                            'state': 3  # 待采集任务失败采集状态[3=采集失败]
-                        })
+                    task_dict = task.to_dict
+                    task_dict['state'] = 2  # 待采集任务成功采集状态[2=完成采集]
+                    finished_list.append(task_dict)
 
             # 入库(超过上限[MAX_URL_COUNT]执行)
             if len(task_list) > MAX_TASK_COUNT:
                 self._rabbitmq.add(self._tab_items, task_list)
                 task_list = []
 
-            # 推送消息队列(超过上限[MAX_STATE_COUNT]执行)
-            if len(task_state_list) > MAX_STATE_COUNT:
-                self._rabbitmq.add(self._task_state, task_state_list)
-                task_state_list = []
+            # 推送消息队列(超过上限[MAX_FINISHED_COUNT]执行)
+            if len(finished_list) > MAX_FINISHED_COUNT:
+                self._rabbitmq.add(self._task_crawl_state, finished_list)
+                finished_list = []
 
         # 入库(小于上限[MAX_URL_COUNT]执行)
         if task_list:
             self._rabbitmq.add(self._tab_items, task_list)
 
-        # 推送消息队列(超过上限[MAX_STATE_COUNT]执行)
-        if task_state_list:
-            self._rabbitmq.add(self._task_state, task_state_list)
+        # 推送状态(超过上限[MAX_FINISHED_COUNT]执行)
+        if finished_list:
+            self._rabbitmq.add(self._task_crawl_state, finished_list)
 
         self._is_adding_to_db = False