123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- # coding:utf-8
- from pymongo import MongoClient
- from pymongo.errors import AutoReconnect
- from bson import ObjectId
- import time
- import urllib.parse as parse
- class MongoDBInterface:
- def __init__(self, config):
- self.__host = config.get("ip_port", "")
- self.__user = config.get("user", "")
- self.__password = config.get("password", "")
- self.__database = config.get("db", "")
- self.connect()
- def connect(self):
- """
- 连接数据库
- :return:
- """
- # 特殊符号转义
- self.__user = parse.quote_plus(self.__user)
- self.__password = parse.quote_plus(self.__password)
- # 连接数据库
- try:
- if self.__user:
- self.client = MongoClient(
- "mongodb://{}:{}@{}".format(self.__user, self.__password, self.__host),
- unicode_decode_error_handler='ignore', directConnection=True)
- else:
- self.client = MongoClient("mongodb://{}".format(self.__host), unicode_decode_error_handler='ignore',
- directConnection=True)
- self.db = self.client[self.__database]
- except AutoReconnect as e:
- self.connect()
- def disconnect(self):
- self.client.close()
- def find_by_id(self, collection_name, m_id):
- try:
- collection = self.db[collection_name]
- if isinstance(m_id, str):
- m_id = ObjectId(m_id)
- documents = collection.find_one({"_id": m_id})
- return documents
- except AutoReconnect:
- time.sleep(5)
- print("Connection to MongoDB lost. Reconnecting...")
- self.connect() # 自动重连
- return self.find_by_id(collection_name, m_id)
- def update_by_id(self, collection_name, m_id, fields: dict):
- try:
- collection = self.db[collection_name]
- if isinstance(m_id, str):
- m_id = ObjectId(m_id)
- documents = collection.update_one({"_id": m_id}, {"$set": fields})
- return documents
- except AutoReconnect:
- print("Connection to MongoDB lost. Reconnecting...")
- self.connect() # 自动重连
- return self.update_by_id(collection_name, m_id, fields)
- def find_relus(self, collection_name, hash_id):
- collection = self.db[collection_name]
- row = collection.find_one({"rules_id": hash_id})
- return row
- def find_rule_by_company(self, collection_name, company, version):
- collection = self.db[collection_name]
- row = collection.find_one({"company_name": company, "version": version})
- if not row:
- return None
- return row.get("rules_id")
- def insert2db(self, collection_name, data: dict):
- collection = self.db[collection_name]
- collection.insert_one(data)
|