mongo_tools.py 7.8 KB


  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # author : zhaolongyue
  4. #date : 2023-07-03
  5. from urllib.parse import quote_plus
  6. from pymongo import MongoClient
  7. from bson import ObjectId
  8. class MongoUtil:
  9. @staticmethod
  10. def get_coon(host="192.168.3.167:27088", port=None,database=None, collection=None, authdb=None, authuser=None, authpass=None):
  11. """
  12. 获取mongo数据库连接
  13. :param host:
  14. :param port:
  15. :param database:
  16. :param collection:
  17. :param authdb:
  18. :param authuser:
  19. :param authpass:
  20. :return:
  21. """
  22. if database is None:
  23. raise RuntimeError('database is None')
  24. if collection is None:
  25. raise RuntimeError('collection is None')
  26. username = quote_plus(authuser)
  27. password = quote_plus(authpass)
  28. # conn = MongoClient(host, port, database,collection,username,password,unicode_decode_error_handler="ignore",directConnection=True)
  29. conn = MongoClient(f'mongodb://{username}:{password}@{host}/',
  30. unicode_decode_error_handler="ignore", directConnection=True)[database][collection]
  31. return conn
  32. @staticmethod
  33. def get_coon_test(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None, authpass=None):
  34. """
  35. 获取mongo数据库连接
  36. :param host:
  37. :param port:
  38. :param database:
  39. :param collection:
  40. :param authdb:
  41. :param authuser:
  42. :param authpass:
  43. :return:
  44. """
  45. if database is None:
  46. raise RuntimeError('database is None')
  47. if collection is None:
  48. raise RuntimeError('collection is None')
  49. conn = MongoClient(host, port, unicode_decode_error_handler="ignore")
  50. print(conn)
  51. if authdb is not None:
  52. db_auth = conn[authdb]
  53. db_auth.authenticate(authuser, authpass)
  54. db = conn[database]
  55. collection = db[collection]
  56. return collection
  57. class CoonUtil:
  58. @staticmethod
  59. def get_coon(**kwargs):
  60. """
  61. 获取mongo连接
  62. :param kwargs:
  63. :return:
  64. """
  65. coon = MongoUtil.get_coon_test(host=kwargs["mg_host"], port=kwargs["mg_port"],database=kwargs["database"], collection=kwargs["collection"])
  66. return coon
  67. class MongoSentence:
  68. @staticmethod
  69. def count(coon, nosql=None):
  70. """
  71. count数据量
  72. :param coon:
  73. :param nosql:
  74. :return:
  75. """
  76. if nosql is None:
  77. return coon.find({}).count()
  78. else:
  79. return coon.count_documents(nosql)
  80. @staticmethod
  81. def find_all(coon, columns=None):
  82. """
  83. 无查询条件返回指定字段的全量数据
  84. :param coon:
  85. :param columns:
  86. :return:
  87. """
  88. # data = DataFrame(list(self.collection.find({})))
  89. # data.drop(["_id"],axis=customer_program,inplace=True)
  90. # return data
  91. vlist = []
  92. if columns is None:
  93. vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"})
  94. else:
  95. cols = {}
  96. for c in columns:
  97. cols[c] = 1
  98. vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"}, cols).batch_size(1000)
  99. return vlist
  100. @staticmethod
  101. def find_by_Nosql(coon, nosql={}, columns=None):
  102. vlist = []
  103. # print(nosql)
  104. if columns is None:
  105. vlist = coon.find(nosql)
  106. else:
  107. cols = {}
  108. for c in columns:
  109. cols[c] = 1
  110. vlist = coon.collection.find(nosql, cols)
  111. return vlist
  112. @staticmethod
  113. def update_ir_ent_name_by_id(coon, oid, obj):
  114. coon.update_one({"_id": ObjectId(oid)}, {"$set": {"IR_ENTNAME": obj}})
  115. @staticmethod
  116. def update_by_id(coon, oid, obj):
  117. coon.update_one({"_id": ObjectId(oid)}, {"$set": obj})
  118. @staticmethod
  119. def find_one_by_company_name(coon, company):
  120. return coon.find({"company_name": company}).count()
  121. @staticmethod
  122. def save(coon, obj):
  123. """
  124. 插入数据
  125. :param coon:
  126. :param obj:
  127. :return:
  128. """
  129. coon.save(obj)
  130. @staticmethod
  131. def insert_many(coon, bulk):
  132. """
  133. 批量插入
  134. :param coon:
  135. :param bulk:
  136. :return:
  137. """
  138. coon.insert_many(bulk)
  139. @staticmethod
  140. def delcol_by_id(coon, id, column):
  141. """
  142. 删除数据
  143. :param coon:
  144. :param id:
  145. :param column:
  146. :return:
  147. """
  148. coon.collection.update_one({"_id": id}, {"$unset": {column: ""}})
  149. @staticmethod
  150. def find_one_by_id(coon, nosql, column):
  151. return coon.collection.find_one(nosql, column)
  152. # 这个是删表操作
  153. @staticmethod
  154. def clear(coon):
  155. """
  156. 删除表
  157. :param coon:
  158. :return:
  159. """
  160. coon.collection.drop()
  161. class Data_get():
  162. @staticmethod
  163. #连接数据库数据表
  164. def get_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
  165. authpass=None):
  166. """
  167. 获取mongo数据库连接
  168. :param host:
  169. :param port:
  170. :param database:
  171. :param collection:
  172. :param authdb:
  173. :param authuser:
  174. :param authpass:
  175. :return:
  176. """
  177. if database is None:
  178. raise RuntimeError('database is None')
  179. if collection is None:
  180. raise RuntimeError('collection is None')
  181. con = MongoClient(host, port, unicode_decode_error_handler="ignore")
  182. # print(con)
  183. if authdb is not None:
  184. db_auth = con[authdb]
  185. db_auth.authenticate(authuser, authpass)
  186. db = con[database]
  187. collection = db[collection]
  188. return collection
  189. @staticmethod
  190. #随机获取id
  191. def get_id_sample(con):
  192. id_list = []
  193. for item in con.aggregate([{"$match":{"repeat":0}},{'$sample': {'size': 100}}, {"$project": {"_id": 1}}]):
  194. id_list.append(str(item["_id"]))
  195. return id_list
  196. @staticmethod
  197. #获取id
  198. def get_id_mongo(con):
  199. id_list = []
  200. @staticmethod
  201. #根据ids,从数据库获取数据
  202. def data_ids_mongo(ids,con,save_con):
  203. id_list = ids
  204. for id in id_list:
  205. query = {"_id": ObjectId(id)}
  206. list_item = list(con.find(query))
  207. if list_item:
  208. Data_save.save(save_con,list_item[0])
  209. class Data_save():
  210. @staticmethod
  211. def save_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
  212. authpass=None):
  213. """
  214. 获取mongo数据库连接
  215. :param host:
  216. :param port:
  217. :param database:
  218. :param collection:
  219. :param authdb:
  220. :param authuser:
  221. :param authpass:
  222. :return:
  223. """
  224. if database is None:
  225. raise RuntimeError('database is None')
  226. if collection is None:
  227. raise RuntimeError('collection is None')
  228. con = MongoClient(host, port, unicode_decode_error_handler="ignore")
  229. # print(con)
  230. if authdb is not None:
  231. db_auth = con[authdb]
  232. db_auth.authenticate(authuser, authpass)
  233. db = con[database]
  234. collection = db[collection]
  235. return collection
  236. @staticmethod
  237. def save(con, obj):
  238. """
  239. 插入数据
  240. :param coon:
  241. :param obj:
  242. :return:
  243. """
  244. con.save(obj)
  245. @staticmethod
  246. def insert_one(con,obj):
  247. """
  248. 插入数据
  249. :param coon:
  250. :param obj:
  251. :return:
  252. """
  253. con.insert_one(obj)
  254. @staticmethod
  255. def insert_many(con, bulk):
  256. """
  257. 批量插入
  258. :param con:
  259. :param bulk:
  260. :return:
  261. """
  262. con.insert_many(bulk)