1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- # -*- coding: utf-8 -*-
- """
- Created on 2021-04-18 14:12:21
- ---------
- @summary: 导出数据
- ---------
- @author: 马国鹏
- @email: 305021384@qq.com
- """
- from typing import Dict, List, Tuple
- import time
- from feapder.db.mongodb import MongoDB
- from feapder.dedup import Dedup
- from feapder.pipelines import BasePipeline
- from feapder.utils.log import log
- from untils.tools import *
- from crawlab import save_item
- class MongoPipeline(BasePipeline):
- def __init__(self):
- self._to_db = None
- @property
- def to_db(self):
- if not self._to_db:
- self._to_db = MongoDB()
- return self._to_db
- def save_items(self, table, items: List[Dict]) -> bool:
- """
- 保存数据
- Args:
- table: 表名
- items: 数据,[{},{},...]
- Returns: 是否保存成功 True / False
- 若False,不会将本批数据入到去重库,以便再次入库
- """
- try:
- print(table)
- add_count = self.to_db.add_batch(coll_name=table, datas=items)
- for item in items:
- dedup = Dedup(Dedup.BloomFilter)
- dedup.add([item.get("href")])
- # save_item({'count':item.get("href")})
- datas_size = len(items)
- log.info(
- "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条"
- % (datas_size, table, add_count, datas_size - add_count)
- )
- if table == "mgp_list":
- save_item({"site": "新增/回填", "title": add_count})
- return True
- except Exception as e:
- log.exception(e)
- return False
- def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
- """
- 更新数据
- Args:
- table: 表名
- items: 数据,[{},{},...]
- update_keys: 更新的字段, 如 ("title", "publish_time")
- Returns: 是否更新成功 True / False
- 若False,不会将本批数据入到去重库,以便再次入库
- """
- try:
- add_count = self.to_db.add_batch(
- coll_name=table,
- datas=items,
- update_columns=update_keys or list(items[0].keys()),
- )
- datas_size = len(items)
- update_count = datas_size - add_count
- msg = "共导出 %s 条数据到 %s, 新增 %s 条, 更新 %s 条" % (
- datas_size,
- table,
- add_count,
- update_count,
- )
- if update_keys:
- msg += " 更新字段为 {}".format(update_keys)
- log.info(msg)
- return True
- except Exception as e:
- log.exception(e)
- return False
|