mongo_pipeline.py 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021-04-18 14:12:21
  4. ---------
  5. @summary: 导出数据
  6. ---------
  7. @author: 马国鹏
  8. @email: 305021384@qq.com
  9. """
  10. from typing import Dict, List, Tuple
  11. import time
  12. # from feapder.db.mongodb import MongoDB
  13. from feapder.db.redisdb import RedisDB
  14. from feapder.dedup import Dedup
  15. from feapder.pipelines import BasePipeline
  16. from feapder.utils.log import log
  17. from untils.tools import *
  18. # from crawlab import save_item
  19. class MongoPipeline(BasePipeline):
  20. def __init__(self):
  21. self._to_db = None
  22. @property
  23. def to_db(self):
  24. if not self._to_db:
  25. self._to_db = RedisDB()
  26. return self._to_db
  27. def save_items(self, table, items: List[Dict]) -> bool:
  28. """
  29. 保存数据
  30. Args:
  31. table: 表名
  32. items: 数据,[{},{},...]
  33. Returns: 是否保存成功 True / False
  34. 若False,不会将本批数据入到去重库,以便再次入库
  35. """
  36. try:
  37. add_count = self.to_db.lpush(table="savemongo:"+table, values=items)
  38. # add_count = self.to_db.lpop(table="savemongo:"+table, values=items)
  39. datas_size = len(items)
  40. log.info(
  41. "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条"
  42. % (datas_size, table, len(items), datas_size - len(items))
  43. )
  44. return True
  45. except Exception as e:
  46. log.exception(e)
  47. return False