123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-09-23
- ---------
- @summary: rabbitmq数据通道
- ---------
- @author: Dzr
- """
- from typing import Dict, List
- from feapder.db.rabbitMq import RabbitMQ
- from feapder.pipelines import BasePipeline
- from feapder.utils.tools import log
- class RabbitMqPipeline(BasePipeline):
- def __init__(self):
- self._to_db = None
- @property
- def to_db(self):
- if not self._to_db:
- self._to_db = RabbitMQ()
- return self._to_db
- def save_items(self, table, items: List[Dict]) -> bool:
- """
- 保存数据
- Args:
- table: 表名
- items: 数据,[{},{},...]
- Returns: 是否保存成功 True / False
- 若False,不会将本批数据入到去重库,以便再次入库
- """
- try:
- self.to_db.add_batch(table, items)
- datas_size = len(items)
- log.info("共导出 %s 条数据到 %s" % (datas_size, table))
- return True
- except Exception as e:
- log.exception(e)
- return False
- def update_items(self, table, items: List[Dict], **kwargs) -> bool:
- """
- 更新数据
- Args:
- table: 表名
- items: 数据,[{},{},...]
- Returns: 是否更新成功 True / False
- 若False,不会将本批数据入到去重库,以便再次入库
- """
- items = [{'amq_update': items}]
- return self.save_items(table, items)
- def close(self):
- self.to_db.close()
|