# -*- coding: utf-8 -*- """ Created on 2021-04-18 14:12:21 --------- @summary: 导出数据 --------- @author: 马国鹏 @email: 305021384@qq.com """ from typing import Dict, List, Tuple import time from feapder.db.mongodb import MongoDB from feapder.dedup import Dedup from feapder.pipelines import BasePipeline from feapder.utils.log import log from untils.tools import * # from crawlab import save_item class MongoPipeline(BasePipeline): def __init__(self): self._to_db = None @property def to_db(self): if not self._to_db: self._to_db = MongoDB() print("创建新连接?") return self._to_db def save_items(self, table, items: List[Dict]) -> bool: """ 保存数据 Args: table: 表名 items: 数据,[{},{},...] Returns: 是否保存成功 True / False 若False,不会将本批数据入到去重库,以便再次入库 """ try: add_count = self.to_db.add_batch(coll_name=table, datas=items) for item in items: dedup = Dedup(Dedup.BloomFilter) dedup.add([item.get("href")]) # save_item({'count':item.get("href")}) datas_size = len(items) log.info( "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条" % (datas_size, table, add_count, datas_size - add_count) ) # wechat_warning(f"{site} 数据导报\n共插入 {datas_size} 条数据到 {table}") # for i in range(add_count): # if table == "mgp_list": # save_item({"site": "失败回填", "title": add_count}) return True except Exception as e: log.exception(e) return False def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool: """ 更新数据 Args: table: 表名 items: 数据,[{},{},...] update_keys: 更新的字段, 如 ("title", "publish_time") Returns: 是否更新成功 True / False 若False,不会将本批数据入到去重库,以便再次入库 """ try: # self.to_db.find() add_count = self.to_db.add_batch( coll_name=table, datas=items, update_columns=update_keys or list(items[0].keys()), ) datas_size = len(items) update_count = datas_size - add_count msg = "共导出 %s 条数据到 %s, 新增 %s 条, 更新 %s 条" % ( datas_size, table, add_count, update_count, ) if update_keys: msg += " 更新字段为 {}".format(update_keys) log.info(msg) return True except Exception as e: log.exception(e) return False