#!/usr/bin/env python # -*- coding:utf-8 -*- # author : zhaolongyue #date : 2023-07-03 from urllib.parse import quote_plus from pymongo import MongoClient from bson import ObjectId class MongoUtil: @staticmethod def get_coon(host="192.168.3.167:27088", database=None, collection=None, authdb=None, authuser=None, authpass=None): """ 获取mongo数据库连接 :param host: :param port: :param database: :param collection: :param authdb: :param authuser: :param authpass: :return: """ if database is None: raise RuntimeError('database is None') if collection is None: raise RuntimeError('collection is None') username = quote_plus(authuser) password = quote_plus(authpass) # conn = MongoClient(host, port, database,collection,username,password,unicode_decode_error_handler="ignore",directConnection=True) conn = MongoClient(f'mongodb://{username}:{password}@{host}/', unicode_decode_error_handler="ignore", directConnection=True)[database][collection] return conn class CoonUtil: @staticmethod def get_coon(**kwargs): """ 获取mongo连接 :param kwargs: :return: """ coon = MongoUtil.get_coon(host=kwargs["mg_host"], port=kwargs["mg_port"], database=kwargs["database"], collection=kwargs["collection"]) return coon class MongoSentence: @staticmethod def count(coon, nosql=None): """ count数据量 :param coon: :param nosql: :return: """ if nosql is None: return coon.find({}).count() else: return coon.count_documents(nosql) @staticmethod def find_all(coon, columns=None): """ 无查询条件返回指定字段的全量数据 :param coon: :param columns: :return: """ # data = DataFrame(list(self.collection.find({}))) # data.drop(["_id"],axis=customer_program,inplace=True) # return data vlist = [] if columns is None: vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"}) else: cols = {} for c in columns: cols[c] = 1 vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"}, cols).batch_size(1000) return vlist @staticmethod def find_by_Nosql(coon, nosql={}, columns=None): vlist = [] # print(nosql) if columns is None: vlist = coon.find(nosql) else: cols = {} for c in columns: cols[c] = 1 vlist = coon.collection.find(nosql, cols) return vlist @staticmethod def update_ir_ent_name_by_id(coon, oid, obj): coon.update_one({"_id": ObjectId(oid)}, {"$set": {"IR_ENTNAME": obj}}) @staticmethod def update_by_id(coon, oid, obj): coon.update_one({"_id": ObjectId(oid)}, {"$set": obj}) @staticmethod def find_one_by_company_name(coon, company): return coon.find({"company_name": company}).count() @staticmethod def save(coon, obj): """ 插入数据 :param coon: :param obj: :return: """ coon.save(obj) @staticmethod def insert_many(coon, bulk): """ 批量插入 :param coon: :param bulk: :return: """ coon.insert_many(bulk) @staticmethod def delcol_by_id(coon, id, column): """ 删除数据 :param coon: :param id: :param column: :return: """ coon.collection.update_one({"_id": id}, {"$unset": {column: ""}}) @staticmethod def find_one_by_id(coon, nosql, column): return coon.collection.find_one(nosql, column) # 这个是删表操作 @staticmethod def clear(coon): """ 删除表 :param coon: :return: """ coon.collection.drop() class Data_get(): @staticmethod #连接数据库数据表 def get_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None, authpass=None): """ 获取mongo数据库连接 :param host: :param port: :param database: :param collection: :param authdb: :param authuser: :param authpass: :return: """ if database is None: raise RuntimeError('database is None') if collection is None: raise RuntimeError('collection is None') con = MongoClient(host, port, unicode_decode_error_handler="ignore") # print(con) if authdb is not None: db_auth = con[authdb] db_auth.authenticate(authuser, authpass) db = con[database] collection = db[collection] return collection @staticmethod #随机获取id def get_id_sample(con): id_list = [] for item in con.aggregate([{"$match":{"repeat":0}},{'$sample': {'size': 100}}, {"$project": {"_id": 1}}]): id_list.append(str(item["_id"])) return id_list @staticmethod #获取id def get_id_mongo(con): id_list = [] @staticmethod #根据ids,从数据库获取数据 def data_ids_mongo(ids,con,save_con): id_list = ids for id in id_list: query = {"_id": ObjectId(id)} list_item = list(con.find(query)) if list_item: Data_save.save(save_con,list_item[0]) class Data_save(): @staticmethod def save_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None, authpass=None): """ 获取mongo数据库连接 :param host: :param port: :param database: :param collection: :param authdb: :param authuser: :param authpass: :return: """ if database is None: raise RuntimeError('database is None') if collection is None: raise RuntimeError('collection is None') con = MongoClient(host, port, unicode_decode_error_handler="ignore") # print(con) if authdb is not None: db_auth = con[authdb] db_auth.authenticate(authuser, authpass) db = con[database] collection = db[collection] return collection @staticmethod def save(con, obj): """ 插入数据 :param coon: :param obj: :return: """ con.save(obj) @staticmethod def insert_one(con,obj): """ 插入数据 :param coon: :param obj: :return: """ con.insert_one(obj) @staticmethod def insert_many(con, bulk): """ 批量插入 :param con: :param bulk: :return: """ con.insert_many(bulk)