# coding:utf-8 from pymongo import MongoClient from pymongo.errors import AutoReconnect from bson import ObjectId import time import urllib.parse as parse class MongoDBInterface: def __init__(self, config): self.__host = config.get("ip_port", "") self.__user = config.get("user", "") self.__password = config.get("password", "") self.__database = config.get("db", "") self.connect() def connect(self): """ 连接数据库 :return: """ # 特殊符号转义 self.__user = parse.quote_plus(self.__user) self.__password = parse.quote_plus(self.__password) # 连接数据库 try: if self.__user: self.client = MongoClient( "mongodb://{}:{}@{}".format(self.__user, self.__password, self.__host), unicode_decode_error_handler='ignore', directConnection=True) else: self.client = MongoClient("mongodb://{}".format(self.__host), unicode_decode_error_handler='ignore', directConnection=True) self.db = self.client[self.__database] except AutoReconnect as e: self.connect() def disconnect(self): self.client.close() def find_by_id(self, collection_name, m_id): try: collection = self.db[collection_name] if isinstance(m_id, str): m_id = ObjectId(m_id) documents = collection.find_one({"_id": m_id}) return documents except AutoReconnect: time.sleep(5) print("Connection to MongoDB lost. Reconnecting...") self.connect() # 自动重连 return self.find_by_id(collection_name, m_id) def update_by_id(self, collection_name, m_id, fields: dict): try: collection = self.db[collection_name] if isinstance(m_id, str): m_id = ObjectId(m_id) documents = collection.update_one({"_id": m_id}, {"$set": fields}) return documents except AutoReconnect: print("Connection to MongoDB lost. Reconnecting...") self.connect() # 自动重连 return self.update_by_id(collection_name, m_id, fields) def find_relus(self, collection_name, hash_id): collection = self.db[collection_name] row = collection.find_one({"rules_id": hash_id}) return row def find_rule_by_company(self, collection_name, company, version): collection = self.db[collection_name] row = collection.find_one({"company_name": company, "version": version}) if not row: return None return row.get("rules_id") def find_rule_by_title(self, collection_name, title): collection = self.db[collection_name] row = collection.find_one({"title": title}) if not row: return None return row def insert2db(self, collection_name, data: dict): collection = self.db[collection_name] collection.insert_one(data)