mongo_pipeline.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021-04-18 14:12:21
  4. ---------
  5. @summary: 导出数据
  6. ---------
  7. @author: Mkdir700
  8. @email: mkdir700@gmail.com
  9. """
  10. from typing import Dict, List, Tuple
  11. from feapder.db.mongodb import MongoDB
  12. from feapder.pipelines import BasePipeline
  13. from feapder.utils.log import log
  14. import feapder.utils.tools as tools
  15. class MongoPipeline(BasePipeline):
  16. def __init__(self):
  17. self._to_db = None
  18. @property
  19. def to_db(self):
  20. if not self._to_db:
  21. self._to_db = MongoDB()
  22. return self._to_db
  23. def save_items(self, table, items: List[Dict]) -> bool:
  24. """
  25. 保存数据
  26. Args:
  27. table: 表名
  28. items: 数据,[{},{},...]
  29. Returns: 是否保存成功 True / False
  30. 若False,不会将本批数据入到去重库,以便再次入库
  31. """
  32. try:
  33. add_count = self.to_db.add_batch(coll_name=table, datas=items)
  34. datas_size = len(items)
  35. log.info(
  36. "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条"
  37. % (datas_size, table, add_count, datas_size - add_count)
  38. )
  39. return True
  40. except Exception as e:
  41. log.exception(e)
  42. return False
  43. def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
  44. """
  45. 更新数据
  46. Args:
  47. table: 表名
  48. items: 数据,[{},{},...]
  49. update_keys: 更新的字段, 如 ("title", "publish_time")
  50. Returns: 是否更新成功 True / False
  51. 若False,不会将本批数据入到去重库,以便再次入库
  52. """
  53. try:
  54. add_count = self.to_db.add_batch(
  55. coll_name=table,
  56. datas=items,
  57. update_columns=update_keys or list(items[0].keys()),
  58. )
  59. datas_size = len(items)
  60. update_count = datas_size - add_count
  61. msg = "共导出 %s 条数据到 %s, 新增 %s 条, 更新 %s 条" % (
  62. datas_size,
  63. table,
  64. add_count,
  65. update_count,
  66. )
  67. if update_keys:
  68. msg += " 更新字段为 {}".format(update_keys)
  69. log.info(msg)
  70. return True
  71. except Exception as e:
  72. log.exception(e)
  73. return False
  74. class TaskPipeline(MongoPipeline):
  75. def find_items(self, table, condition=None, limit=10):
  76. """
  77. 数据查询
  78. @param str table: 表名
  79. @param dict condition: 查询条件
  80. @param limit: 查询数量
  81. """
  82. return self.to_db.find(table, condition, limit)