# -*- coding: utf-8 -*- """ Created on 2023-09-23 --------- @summary: rabbitmq数据通道 --------- @author: Dzr """ from typing import Dict, List from feapder.db.rabbitMq import RabbitMQ from feapder.pipelines import BasePipeline from feapder.utils.tools import log class RabbitMqPipeline(BasePipeline): def __init__(self): self._to_db = None @property def to_db(self): if not self._to_db: self._to_db = RabbitMQ() return self._to_db def save_items(self, table, items: List[Dict]) -> bool: """ 保存数据 Args: table: 表名 items: 数据,[{},{},...] Returns: 是否保存成功 True / False 若False,不会将本批数据入到去重库,以便再次入库 """ try: self.to_db.add_batch(table, items) datas_size = len(items) log.info("共导出 %s 条数据到 %s" % (datas_size, table)) return True except Exception as e: log.exception(e) return False def update_items(self, table, items: List[Dict], **kwargs) -> bool: """ 更新数据 Args: table: 表名 items: 数据,[{},{},...] Returns: 是否更新成功 True / False 若False,不会将本批数据入到去重库,以便再次入库 """ items = [{'amq_update': items}] return self.save_items(table, items) def close(self): self.to_db.close()