1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- # -*- coding: utf-8 -*-
- """
- Created on 2018-07-29 22:48:30
- ---------
- @summary: 导出数据
- ---------
- @author: Boris
- @email: boris_liu@foxmail.com
- """
- from typing import Dict, List, Tuple
- import feapder.utils.tools as tools
- from feapder.db.mysqldb import MysqlDB
- from feapder.pipelines import BasePipeline
- from feapder.utils.log import log
- class MysqlPipeline(BasePipeline):
- def __init__(self):
- self._to_db = None
- @property
- def to_db(self):
- if not self._to_db:
- self._to_db = MysqlDB()
- return self._to_db
- def save_items(self, table, items: List[Dict]) -> bool:
- """
- 保存数据
- Args:
- table: 表名
- items: 数据,[{},{},...]
- Returns: 是否保存成功 True / False
- 若False,不会将本批数据入到去重库,以便再次入库
- """
- sql, datas = tools.make_batch_sql(table, items)
- add_count = self.to_db.add_batch(sql, datas)
- datas_size = len(datas)
- if add_count:
- log.info(
- "共导出 %s 条数据 到 %s, 重复 %s 条" % (datas_size, table, datas_size - add_count)
- )
- return add_count != None
- def update_items(self, table, items: List[Dict], update_keys=Tuple) -> bool:
- """
- 更新数据
- Args:
- table: 表名
- items: 数据,[{},{},...]
- update_keys: 更新的字段, 如 ("title", "publish_time")
- Returns: 是否更新成功 True / False
- 若False,不会将本批数据入到去重库,以便再次入库
- """
- sql, datas = tools.make_batch_sql(
- table, items, update_columns=update_keys or list(items[0].keys())
- )
- update_count = self.to_db.add_batch(sql, datas)
- if update_count:
- msg = "共更新 %s 条数据 到 %s" % (update_count // 2, table)
- if update_keys:
- msg += " 更新字段为 {}".format(update_keys)
- log.info(msg)
- return update_count != None
|