mongo_tools.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # author : zhaolongyue
  4. #date : 2023-07-03
  5. from urllib.parse import quote_plus
  6. from pymongo import MongoClient
  7. from bson import ObjectId
  8. class MongoUtil:
  9. @staticmethod
  10. def get_coon(host="192.168.3.167:27088", database=None, collection=None, authdb=None, authuser=None, authpass=None):
  11. """
  12. 获取mongo数据库连接
  13. :param host:
  14. :param port:
  15. :param database:
  16. :param collection:
  17. :param authdb:
  18. :param authuser:
  19. :param authpass:
  20. :return:
  21. """
  22. if database is None:
  23. raise RuntimeError('database is None')
  24. if collection is None:
  25. raise RuntimeError('collection is None')
  26. username = quote_plus(authuser)
  27. password = quote_plus(authpass)
  28. # conn = MongoClient(host, port, database,collection,username,password,unicode_decode_error_handler="ignore",directConnection=True)
  29. conn = MongoClient(f'mongodb://{username}:{password}@{host}/',
  30. unicode_decode_error_handler="ignore", directConnection=True)[database][collection]
  31. return conn
  32. class CoonUtil:
  33. @staticmethod
  34. def get_coon(**kwargs):
  35. """
  36. 获取mongo连接
  37. :param kwargs:
  38. :return:
  39. """
  40. coon = MongoUtil.get_coon(host=kwargs["mg_host"], port=kwargs["mg_port"],
  41. database=kwargs["database"], collection=kwargs["collection"])
  42. return coon
  43. class MongoSentence:
  44. @staticmethod
  45. def count(coon, nosql=None):
  46. """
  47. count数据量
  48. :param coon:
  49. :param nosql:
  50. :return:
  51. """
  52. if nosql is None:
  53. return coon.find({}).count()
  54. else:
  55. return coon.count_documents(nosql)
  56. @staticmethod
  57. def find_all(coon, columns=None):
  58. """
  59. 无查询条件返回指定字段的全量数据
  60. :param coon:
  61. :param columns:
  62. :return:
  63. """
  64. # data = DataFrame(list(self.collection.find({})))
  65. # data.drop(["_id"],axis=customer_program,inplace=True)
  66. # return data
  67. vlist = []
  68. if columns is None:
  69. vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"})
  70. else:
  71. cols = {}
  72. for c in columns:
  73. cols[c] = 1
  74. vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"}, cols).batch_size(1000)
  75. return vlist
  76. @staticmethod
  77. def find_by_Nosql(coon, nosql={}, columns=None):
  78. vlist = []
  79. # print(nosql)
  80. if columns is None:
  81. vlist = coon.find(nosql)
  82. else:
  83. cols = {}
  84. for c in columns:
  85. cols[c] = 1
  86. vlist = coon.collection.find(nosql, cols)
  87. return vlist
  88. @staticmethod
  89. def update_ir_ent_name_by_id(coon, oid, obj):
  90. coon.update_one({"_id": ObjectId(oid)}, {"$set": {"IR_ENTNAME": obj}})
  91. @staticmethod
  92. def update_by_id(coon, oid, obj):
  93. coon.update_one({"_id": ObjectId(oid)}, {"$set": obj})
  94. @staticmethod
  95. def find_one_by_company_name(coon, company):
  96. return coon.find({"company_name": company}).count()
  97. @staticmethod
  98. def save(coon, obj):
  99. """
  100. 插入数据
  101. :param coon:
  102. :param obj:
  103. :return:
  104. """
  105. coon.save(obj)
  106. @staticmethod
  107. def insert_many(coon, bulk):
  108. """
  109. 批量插入
  110. :param coon:
  111. :param bulk:
  112. :return:
  113. """
  114. coon.insert_many(bulk)
  115. @staticmethod
  116. def delcol_by_id(coon, id, column):
  117. """
  118. 删除数据
  119. :param coon:
  120. :param id:
  121. :param column:
  122. :return:
  123. """
  124. coon.collection.update_one({"_id": id}, {"$unset": {column: ""}})
  125. @staticmethod
  126. def find_one_by_id(coon, nosql, column):
  127. return coon.collection.find_one(nosql, column)
  128. # 这个是删表操作
  129. @staticmethod
  130. def clear(coon):
  131. """
  132. 删除表
  133. :param coon:
  134. :return:
  135. """
  136. coon.collection.drop()
  137. class Data_get():
  138. @staticmethod
  139. #连接数据库数据表
  140. def get_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
  141. authpass=None):
  142. """
  143. 获取mongo数据库连接
  144. :param host:
  145. :param port:
  146. :param database:
  147. :param collection:
  148. :param authdb:
  149. :param authuser:
  150. :param authpass:
  151. :return:
  152. """
  153. if database is None:
  154. raise RuntimeError('database is None')
  155. if collection is None:
  156. raise RuntimeError('collection is None')
  157. con = MongoClient(host, port, unicode_decode_error_handler="ignore")
  158. # print(con)
  159. if authdb is not None:
  160. db_auth = con[authdb]
  161. db_auth.authenticate(authuser, authpass)
  162. db = con[database]
  163. collection = db[collection]
  164. return collection
  165. @staticmethod
  166. #随机获取id
  167. def get_id_sample(con):
  168. id_list = []
  169. for item in con.aggregate([{"$match":{"repeat":0}},{'$sample': {'size': 100}}, {"$project": {"_id": 1}}]):
  170. id_list.append(str(item["_id"]))
  171. return id_list
  172. @staticmethod
  173. #获取id
  174. def get_id_mongo(con):
  175. id_list = []
  176. @staticmethod
  177. #根据ids,从数据库获取数据
  178. def data_ids_mongo(ids,con,save_con):
  179. id_list = ids
  180. for id in id_list:
  181. query = {"_id": ObjectId(id)}
  182. list_item = list(con.find(query))
  183. if list_item:
  184. Data_save.save(save_con,list_item[0])
  185. class Data_save():
  186. @staticmethod
  187. def save_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
  188. authpass=None):
  189. """
  190. 获取mongo数据库连接
  191. :param host:
  192. :param port:
  193. :param database:
  194. :param collection:
  195. :param authdb:
  196. :param authuser:
  197. :param authpass:
  198. :return:
  199. """
  200. if database is None:
  201. raise RuntimeError('database is None')
  202. if collection is None:
  203. raise RuntimeError('collection is None')
  204. con = MongoClient(host, port, unicode_decode_error_handler="ignore")
  205. # print(con)
  206. if authdb is not None:
  207. db_auth = con[authdb]
  208. db_auth.authenticate(authuser, authpass)
  209. db = con[database]
  210. collection = db[collection]
  211. return collection
  212. @staticmethod
  213. def save(con, obj):
  214. """
  215. 插入数据
  216. :param coon:
  217. :param obj:
  218. :return:
  219. """
  220. con.save(obj)
  221. @staticmethod
  222. def insert_one(con,obj):
  223. """
  224. 插入数据
  225. :param coon:
  226. :param obj:
  227. :return:
  228. """
  229. con.insert_one(obj)
  230. @staticmethod
  231. def insert_many(con, bulk):
  232. """
  233. 批量插入
  234. :param con:
  235. :param bulk:
  236. :return:
  237. """
  238. con.insert_many(bulk)