3
0

mongo_pipeline.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021-04-18 14:12:21
  4. ---------
  5. @summary: 导出数据
  6. ---------
  7. @author: 马国鹏
  8. @email: 305021384@qq.com
  9. """
  10. from typing import Dict, List, Tuple
  11. import time
  12. from feapder.db.redisdb import RedisDB
  13. from feapder.dedup import Dedup
  14. from feapder.pipelines import BasePipeline
  15. from feapder.utils.log import log
  16. from untils.tools import *
  17. class RedisPipeline(BasePipeline):
  18. '''数据存储管道-redis版'''
  19. def __init__(self):
  20. self._to_db = None
  21. @property
  22. def to_db(self):
  23. if not self._to_db:
  24. self._to_db = RedisDB()
  25. print("创建新连接?")
  26. return self._to_db
  27. def save_items(self, table, items: List[Dict]) -> bool:
  28. """
  29. 保存数据
  30. Args:
  31. table: 表名
  32. items: 数据,[{},{},...]
  33. Returns: 是否保存成功 True / False
  34. 若False,不会将本批数据入到去重库,以便再次入库
  35. """
  36. try:
  37. add_count = self.to_db.lpush(table="savemongo:"+table, values=items)
  38. print(add_count)
  39. datas_size = len(items)
  40. log.info(
  41. "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条"
  42. % (datas_size, table, len(items), datas_size - len(items))
  43. )
  44. return True
  45. except Exception as e:
  46. log.exception(e)
  47. return False