3
0

mongo_pipeline.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021-04-18 14:12:21
  4. ---------
  5. @summary: 导出数据
  6. ---------
  7. @author: Mkdir700
  8. @email: mkdir700@gmail.com
  9. """
  10. from typing import Dict, List, Tuple
  11. from feapder.db.mongodb import MongoDB
  12. from feapder.pipelines import BasePipeline
  13. from feapder.utils.log import log
  14. class MongoPipeline(BasePipeline):
  15. def __init__(self):
  16. self._to_db = None
  17. @property
  18. def to_db(self):
  19. if not self._to_db:
  20. self._to_db = MongoDB()
  21. return self._to_db
  22. def save_items(self, table, items: List[Dict]) -> bool:
  23. """
  24. 保存数据
  25. Args:
  26. table: 表名
  27. items: 数据,[{},{},...]
  28. Returns: 是否保存成功 True / False
  29. 若False,不会将本批数据入到去重库,以便再次入库
  30. """
  31. try:
  32. add_count = self.to_db.add_batch(coll_name=table, datas=items)
  33. datas_size = len(items)
  34. log.info(
  35. "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条"
  36. % (datas_size, table, add_count, datas_size - add_count)
  37. )
  38. return True
  39. except Exception as e:
  40. log.exception(e)
  41. return False
  42. def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
  43. """
  44. 更新数据
  45. Args:
  46. table: 表名
  47. items: 数据,[{},{},...]
  48. update_keys: 更新的字段, 如 ("title", "publish_time")
  49. Returns: 是否更新成功 True / False
  50. 若False,不会将本批数据入到去重库,以便再次入库
  51. """
  52. try:
  53. add_count = self.to_db.add_batch(
  54. coll_name=table,
  55. datas=items,
  56. update_columns=update_keys or list(items[0].keys()),
  57. )
  58. datas_size = len(items)
  59. update_count = datas_size - add_count
  60. msg = "共导出 %s 条数据到 %s, 新增 %s 条, 更新 %s 条" % (
  61. datas_size,
  62. table,
  63. add_count,
  64. update_count,
  65. )
  66. if update_keys:
  67. msg += " 更新字段为 {}".format(update_keys)
  68. log.info(msg)
  69. return True
  70. except Exception as e:
  71. log.exception(e)
  72. return False