rabbitmq_pipeline.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-09-23
  4. ---------
  5. @summary: rabbitmq数据通道
  6. ---------
  7. @author: Dzr
  8. """
  9. from typing import Dict, List
  10. from feapder.db.rabbitMq import RabbitMQ
  11. from feapder.pipelines import BasePipeline
  12. from feapder.utils.tools import log
  13. class RabbitMqPipeline(BasePipeline):
  14. def __init__(self):
  15. self._to_db = None
  16. @property
  17. def to_db(self):
  18. if not self._to_db:
  19. self._to_db = RabbitMQ()
  20. return self._to_db
  21. def save_items(self, table, items: List[Dict]) -> bool:
  22. """
  23. 保存数据
  24. Args:
  25. table: 表名
  26. items: 数据,[{},{},...]
  27. Returns: 是否保存成功 True / False
  28. 若False,不会将本批数据入到去重库,以便再次入库
  29. """
  30. try:
  31. self.to_db.declare(queue=table)
  32. self.to_db.add(table, items)
  33. datas_size = len(items)
  34. log.info("共导出 %s 条数据到 %s" % (datas_size, table))
  35. return True
  36. except Exception as e:
  37. log.exception(e)
  38. return False
  39. def update_items(self, table, items: List[Dict], **kwargs) -> bool:
  40. """
  41. 更新数据
  42. Args:
  43. table: 表名
  44. items: 数据,[{},{},...]
  45. Returns: 是否更新成功 True / False
  46. 若False,不会将本批数据入到去重库,以便再次入库
  47. """
  48. items = [{'amq_update': items}]
  49. return self.save_items(table, items)
  50. def close(self):
  51. self.to_db.close()