redis_pipeline.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021-04-18 14:12:21
  4. ---------
  5. @summary: 导出数据(写入Redis,不直接保存在MongoDB)
  6. ---------
  7. @author: 马国鹏
  8. @email: 305021384@qq.com
  9. """
  10. from typing import Dict, List
  11. from feapder.db.redisdb import RedisDB
  12. from feapder.pipelines import BasePipeline
  13. from feapder.utils.tools import log
  14. class RedisPipeline(BasePipeline):
  15. def __init__(self):
  16. self._to_db = None
  17. @property
  18. def to_db(self):
  19. if not self._to_db:
  20. self._to_db = RedisDB()
  21. return self._to_db
  22. def save_items(self, table, items: List[Dict]) -> bool:
  23. """
  24. 保存数据
  25. Args:
  26. table: 表名
  27. items: 数据,[{},{},...]
  28. Returns: 是否保存成功 True / False
  29. 若False,不会将本批数据入到去重库,以便再次入库
  30. """
  31. try:
  32. self.to_db.lpush(table=table, values=items)
  33. datas_size = len(items)
  34. log.info("共导出 %s 条数据到 %s" % (datas_size, table))
  35. return True
  36. except Exception as e:
  37. log.exception(e)
  38. return False