rabbitmq_pipeline.py 1.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-09-23
  4. ---------
  5. @summary: rabbitmq数据通道
  6. ---------
  7. @author: Dzr
  8. """
  9. from typing import Dict, List
  10. from feapder.db.rabbitMq import RabbitMQ
  11. from feapder.pipelines import BasePipeline
  12. from feapder.utils.tools import log
  13. class RabbitMqPipeline(BasePipeline):
  14. def __init__(self):
  15. self._to_db = None
  16. @property
  17. def to_db(self):
  18. if not self._to_db:
  19. self._to_db = RabbitMQ()
  20. return self._to_db
  21. def save_items(self, table, items: List[Dict]) -> bool:
  22. """
  23. 保存数据
  24. Args:
  25. table: 表名
  26. items: 数据,[{},{},...]
  27. Returns: 是否保存成功 True / False
  28. 若False,不会将本批数据入到去重库,以便再次入库
  29. """
  30. try:
  31. self.to_db.declare(queue=table)
  32. self.to_db.add(table, items)
  33. datas_size = len(items)
  34. log.info("共导出 %s 条数据到 %s" % (datas_size, table))
  35. return True
  36. except Exception as e:
  37. log.exception(e)
  38. return False
  39. def close(self):
  40. self.to_db.close()