mongo_pipeline_old.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2021-04-18 14:12:21
  4. ---------
  5. @summary: 导出数据
  6. ---------
  7. @author: 马国鹏
  8. @email: 305021384@qq.com
  9. """
  10. from typing import Dict, List, Tuple
  11. import time
  12. from feapder.db.mongodb import MongoDB
  13. from feapder.dedup import Dedup
  14. from feapder.pipelines import BasePipeline
  15. from feapder.utils.log import log
  16. from untils.tools import *
  17. # from crawlab import save_item
  18. class MongoPipeline(BasePipeline):
  19. def __init__(self):
  20. self._to_db = None
  21. @property
  22. def to_db(self):
  23. if not self._to_db:
  24. self._to_db = MongoDB()
  25. print("创建新连接?")
  26. return self._to_db
  27. def save_items(self, table, items: List[Dict]) -> bool:
  28. """
  29. 保存数据
  30. Args:
  31. table: 表名
  32. items: 数据,[{},{},...]
  33. Returns: 是否保存成功 True / False
  34. 若False,不会将本批数据入到去重库,以便再次入库
  35. """
  36. try:
  37. add_count = self.to_db.add_batch(coll_name=table, datas=items)
  38. for item in items:
  39. dedup = Dedup(Dedup.BloomFilter)
  40. dedup.add([item.get("href")])
  41. # save_item({'count':item.get("href")})
  42. datas_size = len(items)
  43. log.info(
  44. "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条"
  45. % (datas_size, table, add_count, datas_size - add_count)
  46. )
  47. # wechat_warning(f"{site} 数据导报\n共插入 {datas_size} 条数据到 {table}")
  48. # for i in range(add_count):
  49. # if table == "mgp_list":
  50. # save_item({"site": "失败回填", "title": add_count})
  51. return True
  52. except Exception as e:
  53. log.exception(e)
  54. return False
  55. def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
  56. """
  57. 更新数据
  58. Args:
  59. table: 表名
  60. items: 数据,[{},{},...]
  61. update_keys: 更新的字段, 如 ("title", "publish_time")
  62. Returns: 是否更新成功 True / False
  63. 若False,不会将本批数据入到去重库,以便再次入库
  64. """
  65. try:
  66. # self.to_db.find()
  67. add_count = self.to_db.add_batch(
  68. coll_name=table,
  69. datas=items,
  70. update_columns=update_keys or list(items[0].keys()),
  71. )
  72. datas_size = len(items)
  73. update_count = datas_size - add_count
  74. msg = "共导出 %s 条数据到 %s, 新增 %s 条, 更新 %s 条" % (
  75. datas_size,
  76. table,
  77. add_count,
  78. update_count,
  79. )
  80. if update_keys:
  81. msg += " 更新字段为 {}".format(update_keys)
  82. log.info(msg)
  83. return True
  84. except Exception as e:
  85. log.exception(e)
  86. return False