瀏覽代碼

添加rabbittmq数据管道模块

dongzhaorui 1 年之前
父節點
當前提交
8ae54ec4e5
共有 1 個文件被更改,包括 45 次插入0 次删除
  1. 45 0
      FworkSpider/feapder/pipelines/rabbitmq_pipeline.py

+ 45 - 0
FworkSpider/feapder/pipelines/rabbitmq_pipeline.py

@@ -0,0 +1,45 @@
+# -*- coding: utf-8 -*-
+"""
+Created on 2021-04-18 14:12:21
+---------
+@summary: 导出数据(写入rabbitmq,不直接保存在MongoDB)
+---------
+@author: Dzr
+"""
+from typing import Dict, List
+
+from feapder.db.rabbitMq import RabbitMQ
+from feapder.pipelines import BasePipeline
+from feapder.utils.tools import log
+
+
+class RabbitMqPipeline(BasePipeline):
+    def __init__(self):
+        self._to_db = None
+
+    @property
+    def to_db(self):
+        if not self._to_db:
+            self._to_db = RabbitMQ()
+        return self._to_db
+
+    def save_items(self, table, items: List[Dict]) -> bool:
+        """
+        保存数据
+        Args:
+            table: 表名
+            items: 数据,[{},{},...]
+
+        Returns: 是否保存成功 True / False
+                 若False,不会将本批数据入到去重库,以便再次入库
+        """
+        try:
+            table_name = "savemongo:" + table
+            self.to_db.declare(queue=table_name)
+            self.to_db.add(table_name, items)
+            datas_size = len(items)
+            log.info("共导出 %s 条数据到 %s" % (datas_size, table_name))
+            return True
+        except Exception as e:
+            log.exception(e)
+            return False