123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # author : zhaolongyue
- #date : 2023-07-03
- from urllib.parse import quote_plus
- from pymongo import MongoClient
- from bson import ObjectId
- class MongoUtil:
- @staticmethod
- def get_coon(host="192.168.3.167:27088", port=None,database=None, collection=None, authdb=None, authuser=None, authpass=None):
- """
- 获取mongo数据库连接
- :param host:
- :param port:
- :param database:
- :param collection:
- :param authdb:
- :param authuser:
- :param authpass:
- :return:
- """
- if database is None:
- raise RuntimeError('database is None')
- if collection is None:
- raise RuntimeError('collection is None')
- username = quote_plus(authuser)
- password = quote_plus(authpass)
- # conn = MongoClient(host, port, database,collection,username,password,unicode_decode_error_handler="ignore",directConnection=True)
- conn = MongoClient(f'mongodb://{username}:{password}@{host}/',
- unicode_decode_error_handler="ignore", directConnection=True)[database][collection]
- return conn
- @staticmethod
- def get_coon_test(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None, authpass=None):
- """
- 获取mongo数据库连接
- :param host:
- :param port:
- :param database:
- :param collection:
- :param authdb:
- :param authuser:
- :param authpass:
- :return:
- """
- if database is None:
- raise RuntimeError('database is None')
- if collection is None:
- raise RuntimeError('collection is None')
- conn = MongoClient(host, port, unicode_decode_error_handler="ignore")
- print(conn)
- if authdb is not None:
- db_auth = conn[authdb]
- db_auth.authenticate(authuser, authpass)
- db = conn[database]
- collection = db[collection]
- return collection
- class CoonUtil:
- @staticmethod
- def get_coon(**kwargs):
- """
- 获取mongo连接
- :param kwargs:
- :return:
- """
- coon = MongoUtil.get_coon_test(host=kwargs["mg_host"], port=kwargs["mg_port"],database=kwargs["database"], collection=kwargs["collection"])
- return coon
- class MongoSentence:
- @staticmethod
- def count(coon, nosql=None):
- """
- count数据量
- :param coon:
- :param nosql:
- :return:
- """
- if nosql is None:
- return coon.find({}).count()
- else:
- return coon.count_documents(nosql)
- @staticmethod
- def find_all(coon, columns=None):
- """
- 无查询条件返回指定字段的全量数据
- :param coon:
- :param columns:
- :return:
- """
- # data = DataFrame(list(self.collection.find({})))
- # data.drop(["_id"],axis=customer_program,inplace=True)
- # return data
- vlist = []
- if columns is None:
- vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"})
- else:
- cols = {}
- for c in columns:
- cols[c] = 1
- vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"}, cols).batch_size(1000)
- return vlist
- @staticmethod
- def find_by_Nosql(coon, nosql={}, columns=None):
- vlist = []
- # print(nosql)
- if columns is None:
- vlist = coon.find(nosql)
- else:
- cols = {}
- for c in columns:
- cols[c] = 1
- vlist = coon.collection.find(nosql, cols)
- return vlist
- @staticmethod
- def update_ir_ent_name_by_id(coon, oid, obj):
- coon.update_one({"_id": ObjectId(oid)}, {"$set": {"IR_ENTNAME": obj}})
- @staticmethod
- def update_by_id(coon, oid, obj):
- coon.update_one({"_id": ObjectId(oid)}, {"$set": obj})
- @staticmethod
- def find_one_by_company_name(coon, company):
- return coon.find({"company_name": company}).count()
- @staticmethod
- def save(coon, obj):
- """
- 插入数据
- :param coon:
- :param obj:
- :return:
- """
- coon.save(obj)
- @staticmethod
- def insert_many(coon, bulk):
- """
- 批量插入
- :param coon:
- :param bulk:
- :return:
- """
- coon.insert_many(bulk)
- @staticmethod
- def delcol_by_id(coon, id, column):
- """
- 删除数据
- :param coon:
- :param id:
- :param column:
- :return:
- """
- coon.collection.update_one({"_id": id}, {"$unset": {column: ""}})
- @staticmethod
- def find_one_by_id(coon, nosql, column):
- return coon.collection.find_one(nosql, column)
- # 这个是删表操作
- @staticmethod
- def clear(coon):
- """
- 删除表
- :param coon:
- :return:
- """
- coon.collection.drop()
- class Data_get():
- @staticmethod
- #连接数据库数据表
- def get_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
- authpass=None):
- """
- 获取mongo数据库连接
- :param host:
- :param port:
- :param database:
- :param collection:
- :param authdb:
- :param authuser:
- :param authpass:
- :return:
- """
- if database is None:
- raise RuntimeError('database is None')
- if collection is None:
- raise RuntimeError('collection is None')
- con = MongoClient(host, port, unicode_decode_error_handler="ignore")
- # print(con)
- if authdb is not None:
- db_auth = con[authdb]
- db_auth.authenticate(authuser, authpass)
- db = con[database]
- collection = db[collection]
- return collection
- @staticmethod
- #随机获取id
- def get_id_sample(con):
- id_list = []
- for item in con.aggregate([{"$match":{"repeat":0}},{'$sample': {'size': 100}}, {"$project": {"_id": 1}}]):
- id_list.append(str(item["_id"]))
- return id_list
- @staticmethod
- #获取id
- def get_id_mongo(con):
- id_list = []
- @staticmethod
- #根据ids,从数据库获取数据
- def data_ids_mongo(ids,con,save_con):
- id_list = ids
- for id in id_list:
- query = {"_id": ObjectId(id)}
- list_item = list(con.find(query))
- if list_item:
- Data_save.save(save_con,list_item[0])
- class Data_save():
- @staticmethod
- def save_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
- authpass=None):
- """
- 获取mongo数据库连接
- :param host:
- :param port:
- :param database:
- :param collection:
- :param authdb:
- :param authuser:
- :param authpass:
- :return:
- """
- if database is None:
- raise RuntimeError('database is None')
- if collection is None:
- raise RuntimeError('collection is None')
- con = MongoClient(host, port, unicode_decode_error_handler="ignore")
- # print(con)
- if authdb is not None:
- db_auth = con[authdb]
- db_auth.authenticate(authuser, authpass)
- db = con[database]
- collection = db[collection]
- return collection
- @staticmethod
- def save(con, obj):
- """
- 插入数据
- :param coon:
- :param obj:
- :return:
- """
- con.save(obj)
- @staticmethod
- def insert_one(con,obj):
- """
- 插入数据
- :param coon:
- :param obj:
- :return:
- """
- con.insert_one(obj)
- @staticmethod
- def insert_many(con, bulk):
- """
- 批量插入
- :param con:
- :param bulk:
- :return:
- """
- con.insert_many(bulk)
|