3
0

rabbitmq_pipeline.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-09-23
  4. ---------
  5. @summary: rabbitmq数据通道
  6. ---------
  7. @author: Dzr
  8. """
  9. from typing import Dict, List
  10. from feapder.db.rabbitMq import RabbitMQ
  11. from feapder.pipelines import BasePipeline
  12. from feapder.utils.tools import log
  13. class RabbitMqPipeline(BasePipeline):
  14. def __init__(self):
  15. self._to_db = None
  16. @property
  17. def to_db(self):
  18. if not self._to_db:
  19. self._to_db = RabbitMQ()
  20. return self._to_db
  21. def save_items(self, table, items: List[Dict]) -> bool:
  22. """
  23. 保存数据
  24. Args:
  25. table: 表名
  26. items: 数据,[{},{},...]
  27. Returns: 是否保存成功 True / False
  28. 若False,不会将本批数据入到去重库,以便再次入库
  29. """
  30. try:
  31. self.to_db.add_batch(table, items)
  32. datas_size = len(items)
  33. log.info("共导出 %s 条数据到 %s" % (datas_size, table))
  34. return True
  35. except Exception as e:
  36. log.exception(e)
  37. return False
  38. def update_items(self, table, items: List[Dict], **kwargs) -> bool:
  39. """
  40. 更新数据
  41. Args:
  42. table: 表名
  43. items: 数据,[{},{},...]
  44. Returns: 是否更新成功 True / False
  45. 若False,不会将本批数据入到去重库,以便再次入库
  46. """
  47. items = [{'amq_update': items}]
  48. return self.save_items(table, items)
  49. def close(self):
  50. self.to_db.close()