12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- # -*- coding: utf-8 -*-
- """
- Created on 2021-04-18 14:12:21
- ---------
- @summary: 导出数据
- ---------
- @author: Mkdir700
- @email: mkdir700@gmail.com
- """
- from typing import Dict, List, Tuple
- from feapder.db.mongodb import MongoDB
- from feapder.pipelines import BasePipeline
- from feapder.utils.log import log
- import feapder.utils.tools as tools
- class MongoPipeline(BasePipeline):
- def __init__(self):
- self._to_db = None
- @property
- def to_db(self):
- if not self._to_db:
- self._to_db = MongoDB()
- return self._to_db
- def save_items(self, table, items: List[Dict]) -> bool:
- """
- 保存数据
- Args:
- table: 表名
- items: 数据,[{},{},...]
- Returns: 是否保存成功 True / False
- 若False,不会将本批数据入到去重库,以便再次入库
- """
- try:
- add_count = self.to_db.add_batch(coll_name=table, datas=items)
- datas_size = len(items)
- log.info(
- "共导出 %s 条数据到 %s, 新增 %s条, 重复 %s 条"
- % (datas_size, table, add_count, datas_size - add_count)
- )
- return True
- except Exception as e:
- log.exception(e)
- return False
- def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
- """
- 更新数据
- Args:
- table: 表名
- items: 数据,[{},{},...]
- update_keys: 更新的字段, 如 ("title", "publish_time")
- Returns: 是否更新成功 True / False
- 若False,不会将本批数据入到去重库,以便再次入库
- """
- try:
- add_count = self.to_db.add_batch(
- coll_name=table,
- datas=items,
- update_columns=update_keys or list(items[0].keys()),
- )
- datas_size = len(items)
- update_count = datas_size - add_count
- msg = "共导出 %s 条数据到 %s, 新增 %s 条, 更新 %s 条" % (
- datas_size,
- table,
- add_count,
- update_count,
- )
- if update_keys:
- msg += " 更新字段为 {}".format(update_keys)
- log.info(msg)
- return True
- except Exception as e:
- log.exception(e)
- return False
- class TaskPipeline(MongoPipeline):
- def find_items(self, table, condition=None, limit=10):
- """
- 数据查询
- @param str table: 表名
- @param dict condition: 查询条件
- @param limit: 查询数量
- """
- return self.to_db.find(table, condition, limit)
|