rabbitmq_pipeline.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-09-23
  4. ---------
  5. @summary: rabbitmq数据通道
  6. ---------
  7. @author: Dzr
  8. """
  9. from typing import Dict, List
  10. from feapder.db.rabbitMq import RabbitMQ
  11. from feapder.pipelines import BasePipeline
  12. from feapder.utils.tools import log
  13. class RabbitMqPipeline(BasePipeline):
  14. def __init__(self):
  15. self._to_db = None
  16. @property
  17. def to_db(self):
  18. if not self._to_db:
  19. self._to_db = RabbitMQ()
  20. return self._to_db
  21. def save_items(self, table, items: List[Dict]) -> bool:
  22. """
  23. 保存数据
  24. Args:
  25. table: 表名
  26. items: 数据,[{},{},...]
  27. Returns: 是否保存成功 True / False
  28. 若False,不会将本批数据入到去重库,以便再次入库
  29. """
  30. try:
  31. table_name = "savemongo:" + table
  32. self.to_db.declare(queue=table_name)
  33. self.to_db.add(table_name, items)
  34. datas_size = len(items)
  35. log.info("共导出 %s 条数据到 %s" % (datas_size, table_name))
  36. return True
  37. except Exception as e:
  38. log.exception(e)
  39. return False
  40. def close(self):
  41. self.to_db.close()