# coding:utf-8 """ mongodb 数据库连接文件 """ from pymongo import MongoClient import urllib.parse as parse from loguru import logger from pymongo.errors import CursorNotFound class MongoConnect(object): def __init__(self, config): self.__host = config.get("host", "") self.__user = config.get("user", "") self.__password = config.get("password", "") self.__database = config.get("db", "") self.__col = config.get("col", "") self.__charset = config.get("charset", "") self.client, self.col = self.connect() def connect(self): """ 连接数据库 :return: """ # 特殊符号转义 self.__user = parse.quote_plus(self.__user) self.__password = parse.quote_plus(self.__password) # 连接数据库 if self.__user: client = MongoClient( "mongodb://{}:{}@{}".format(self.__user, self.__password, self.__host), unicode_decode_error_handler='ignore') else: client = MongoClient( "mongodb://{}".format(self.__host), unicode_decode_error_handler='ignore') col = client[self.__database][self.__col] return client, col def get_by_mid(self, m_id, fields): info = {} for i in range(2): try: info = self.col.find_one({"_id": m_id}, fields) break except CursorNotFound as e: logger.warning(e) self.client, self.col = self.connect() return info def insert(self, row): info = {} for i in range(2): try: info = self.col.insert_one(row) break except CursorNotFound as e: logger.warning(e) self.client, self.col = self.connect() return info def update(self, m_id, row): info = {} for i in range(2): try: info = self.col.update_one({"_id": m_id}, {"$set": row}) break except CursorNotFound as e: logger.warning(e) self.client, self.col = self.connect() return info