mongo_pipeline.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021-04-18 14:12:21
  4. ---------
  5. @summary: 导出数据
  6. ---------
  7. @author: 马国鹏
  8. @email: 305021384@qq.com
  9. """
  10. from typing import Dict, List, Tuple
  11. import time
  12. from feapder.db.mongodb import MongoDB
  13. from feapder.dedup import Dedup
  14. from feapder.pipelines import BasePipeline
  15. from feapder.utils.log import log
  16. from untils.tools import *
  17. from crawlab import save_item
  18. class MongoPipeline(BasePipeline):
  19. def __init__(self):
  20. self._to_db = None
  21. @property
  22. def to_db(self):
  23. if not self._to_db:
  24. self._to_db = MongoDB()
  25. return self._to_db
  26. def save_items(self, table, items: List[Dict]) -> bool:
  27. """
  28. 保存数据
  29. Args:
  30. table: 表名
  31. items: 数据,[{},{},...]
  32. Returns: 是否保存成功 True / False
  33. 若False,不会将本批数据入到去重库,以便再次入库
  34. """
  35. try:
  36. print(table)
  37. add_count = self.to_db.add_batch(coll_name=table, datas=items)
  38. for item in items:
  39. dedup = Dedup(Dedup.BloomFilter)
  40. dedup.add([item.get("href")])
  41. # save_item({'count':item.get("href")})
  42. datas_size = len(items)
  43. log.info(
  44. "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条"
  45. % (datas_size, table, add_count, datas_size - add_count)
  46. )
  47. if table == "mgp_list":
  48. save_item({"site": "新增/回填", "title": add_count})
  49. return True
  50. except Exception as e:
  51. log.exception(e)
  52. return False
  53. def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
  54. """
  55. 更新数据
  56. Args:
  57. table: 表名
  58. items: 数据,[{},{},...]
  59. update_keys: 更新的字段, 如 ("title", "publish_time")
  60. Returns: 是否更新成功 True / False
  61. 若False,不会将本批数据入到去重库,以便再次入库
  62. """
  63. try:
  64. add_count = self.to_db.add_batch(
  65. coll_name=table,
  66. datas=items,
  67. update_columns=update_keys or list(items[0].keys()),
  68. )
  69. datas_size = len(items)
  70. update_count = datas_size - add_count
  71. msg = "共导出 %s 条数据到 %s, 新增 %s 条, 更新 %s 条" % (
  72. datas_size,
  73. table,
  74. add_count,
  75. update_count,
  76. )
  77. if update_keys:
  78. msg += " 更新字段为 {}".format(update_keys)
  79. log.info(msg)
  80. return True
  81. except Exception as e:
  82. log.exception(e)
  83. return False