mongo_pipeline.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021-04-18 14:12:21
  4. ---------
  5. @summary: 导出数据
  6. ---------
  7. @author: 马国鹏
  8. @email: 305021384@qq.com
  9. """
  10. from typing import Dict, List, Tuple
  11. import time
  12. from feapder.db.mongodb import MongoDB
  13. from feapder.dedup import Dedup
  14. from feapder.pipelines import BasePipeline
  15. from feapder.utils.log import log
  16. from untils.tools import *
  17. from crawlab import save_item
  18. class MongoPipeline(BasePipeline):
  19. def __init__(self):
  20. self._to_db = None
  21. @property
  22. def to_db(self):
  23. if not self._to_db:
  24. self._to_db = MongoDB()
  25. return self._to_db
  26. def save_items(self, table, items: List[Dict]) -> bool:
  27. """
  28. 保存数据
  29. Args:
  30. table: 表名
  31. items: 数据,[{},{},...]
  32. Returns: 是否保存成功 True / False
  33. 若False,不会将本批数据入到去重库,以便再次入库
  34. """
  35. try:
  36. add_count = self.to_db.add_batch(coll_name=table, datas=items)
  37. for item in items:
  38. dedup = Dedup(Dedup.BloomFilter)
  39. dedup.add([item.get("href")])
  40. # save_item({'count':item.get("href")})
  41. datas_size = len(items)
  42. log.info(
  43. "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条"
  44. % (datas_size, table, add_count, datas_size - add_count)
  45. )
  46. # wechat_warning(f"{site} 数据导报\n共插入 {datas_size} 条数据到 {table}")
  47. # for i in range(add_count):
  48. if table == "mgp_list":
  49. save_item({"site": "失败回填", "title": add_count})
  50. return True
  51. except Exception as e:
  52. log.exception(e)
  53. return False
  54. def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
  55. """
  56. 更新数据
  57. Args:
  58. table: 表名
  59. items: 数据,[{},{},...]
  60. update_keys: 更新的字段, 如 ("title", "publish_time")
  61. Returns: 是否更新成功 True / False
  62. 若False,不会将本批数据入到去重库,以便再次入库
  63. """
  64. try:
  65. add_count = self.to_db.add_batch(
  66. coll_name=table,
  67. datas=items,
  68. update_columns=update_keys or list(items[0].keys()),
  69. )
  70. datas_size = len(items)
  71. update_count = datas_size - add_count
  72. msg = "共导出 %s 条数据到 %s, 新增 %s 条, 更新 %s 条" % (
  73. datas_size,
  74. table,
  75. add_count,
  76. update_count,
  77. )
  78. if update_keys:
  79. msg += " 更新字段为 {}".format(update_keys)
  80. log.info(msg)
  81. return True
  82. except Exception as e:
  83. log.exception(e)
  84. return False