浏览代码

清除任务中心标识状态

dongzhaorui 4 月之前
父节点
当前提交
a439763f86
共有 1 个文件被更改,包括 26 次插入12 次删除
  1. 26 12
      FworkSpider/feapder/buffer/item_buffer/jy_item_buffer.py

+ 26 - 12
FworkSpider/feapder/buffer/item_buffer/jy_item_buffer.py

@@ -67,7 +67,6 @@ class JyItemBuffer(threading.Thread):
             self.export_falied_times = 0
             # 缓存队列
             self.tasks_dict = {}
-            self.release_task_enable = False
 
     def load_pipelines(self):
         pipelines = []
@@ -280,25 +279,40 @@ class JyItemBuffer(threading.Thread):
         return datas_dict
 
     def release_tasks(self, datas, finished=True):
-        if not self.release_task_enable or not datas:
+        if not datas:
             return
 
-        if "token" not in datas and "token" not in self.tasks_dict:
-            # 列表爬虫,不申请任务接口身份token,直接跳过
+        token = self.tasks_dict["token"] if "token" in self.tasks_dict else None
+        if not token:
             return
 
-        token = datas.get("token") or self.tasks_dict["token"]
+        all_task_dict = {}
+        if "data" in self.tasks_dict:
+            all_task_dict = self.tasks_dict["data"]
+
+        commit_task_dict = {}
+        if "data" in datas:
+            commit_task_dict = datas["data"]
+
+        if not all_task_dict and not commit_task_dict:
+            return
 
         release_tasks = []
         if not finished:
-            for pyuuid in dict(self.tasks_dict["data"]):
-                release_tasks.append(datas["data"].pop(pyuuid))
+            # 爬虫运行结束,释放剩余未完成采集的任务
+            for pyuuid in dict(all_task_dict):
+                release_tasks.append(commit_task_dict.pop(pyuuid))
         else:
-            target_pyuuid_lst = [data["pyuuid"] for data in datas]
-            for pyuuid in target_pyuuid_lst:
-                # if pyuuid not in self.tasks_dict["data"]:
-                #     continue
-                release_tasks.append(self.tasks_dict["data"].pop(pyuuid))
+            # 爬虫运行中,释放已采集完成的任务
+            finished_task_pyuuid_lst = [data["pyuuid"] for data in datas]
+            for pyuuid in finished_task_pyuuid_lst:
+                if pyuuid not in all_task_dict:
+                    continue
+                release_tasks.append(all_task_dict.pop(pyuuid))
+
+        if len(release_tasks) == 0:
+            log.debug("无回传任务")
+            return
 
         r = None
         url = f"{setting.JY_TASK_URL}/tasks/batch-release"