mongo_tools.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # author : zhaolongyue
  4. #date : 2023-07-03
  5. from pymongo import MongoClient
  6. from bson import ObjectId
  7. class MongoUtil:
  8. @staticmethod
  9. def get_coon(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None, authpass=None):
  10. """
  11. 获取mongo数据库连接
  12. :param host:
  13. :param port:
  14. :param database:
  15. :param collection:
  16. :param authdb:
  17. :param authuser:
  18. :param authpass:
  19. :return:
  20. """
  21. if database is None:
  22. raise RuntimeError('database is None')
  23. if collection is None:
  24. raise RuntimeError('collection is None')
  25. conn = MongoClient(host, port, unicode_decode_error_handler="ignore")
  26. print(conn)
  27. if authdb is not None:
  28. db_auth = conn[authdb]
  29. db_auth.authenticate(authuser, authpass)
  30. db = conn[database]
  31. collection = db[collection]
  32. return collection
  33. class CoonUtil:
  34. @staticmethod
  35. def get_coon(**kwargs):
  36. """
  37. 获取mongo连接
  38. :param kwargs:
  39. :return:
  40. """
  41. coon = MongoUtil.get_coon(host=kwargs["mg_host"], port=kwargs["mg_port"],
  42. database=kwargs["database"], collection=kwargs["collection"])
  43. return coon
  44. class MongoSentence:
  45. @staticmethod
  46. def count(coon, nosql=None):
  47. """
  48. count数据量
  49. :param coon:
  50. :param nosql:
  51. :return:
  52. """
  53. if nosql is None:
  54. return coon.find({}).count()
  55. else:
  56. return coon.count(nosql)
  57. @staticmethod
  58. def find_all(coon, columns=None):
  59. """
  60. 无查询条件返回指定字段的全量数据
  61. :param coon:
  62. :param columns:
  63. :return:
  64. """
  65. # data = DataFrame(list(self.collection.find({})))
  66. # data.drop(["_id"],axis=customer_program,inplace=True)
  67. # return data
  68. vlist = []
  69. if columns is None:
  70. vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"})
  71. else:
  72. cols = {}
  73. for c in columns:
  74. cols[c] = 1
  75. vlist = coon.find({"item" :"5f0bcb65fc58d361fb9027f6"}, cols).batch_size(1000)
  76. return vlist
  77. @staticmethod
  78. def find_by_Nosql(coon, nosql={}, columns=None):
  79. vlist = []
  80. # print(nosql)
  81. if columns is None:
  82. vlist = coon.find(nosql)
  83. else:
  84. cols = {}
  85. for c in columns:
  86. cols[c] = 1
  87. vlist = coon.collection.find(nosql, cols)
  88. return vlist
  89. @staticmethod
  90. def update_ir_ent_name_by_id(coon, oid, obj):
  91. coon.update_one({"_id": ObjectId(oid)}, {"$set": {"IR_ENTNAME": obj}})
  92. @staticmethod
  93. def update_by_id(coon, oid, obj):
  94. coon.update_one({"_id": ObjectId(oid)}, {"$set": obj})
  95. @staticmethod
  96. def find_one_by_company_name(coon, company):
  97. return coon.find({"company_name": company}).count()
  98. @staticmethod
  99. def save(coon, obj):
  100. """
  101. 插入数据
  102. :param coon:
  103. :param obj:
  104. :return:
  105. """
  106. coon.save(obj)
  107. @staticmethod
  108. def insert_many(coon, bulk):
  109. """
  110. 批量插入
  111. :param coon:
  112. :param bulk:
  113. :return:
  114. """
  115. coon.insert_many(bulk)
  116. @staticmethod
  117. def delcol_by_id(coon, id, column):
  118. """
  119. 删除数据
  120. :param coon:
  121. :param id:
  122. :param column:
  123. :return:
  124. """
  125. coon.collection.update_one({"_id": id}, {"$unset": {column: ""}})
  126. @staticmethod
  127. def find_one_by_id(coon, nosql, column):
  128. return coon.collection.find_one(nosql, column)
  129. # 这个是删表操作
  130. @staticmethod
  131. def clear(coon):
  132. """
  133. 删除表
  134. :param coon:
  135. :return:
  136. """
  137. coon.collection.drop()
  138. class Data_get():
  139. @staticmethod
  140. #连接数据库数据表
  141. def get_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
  142. authpass=None):
  143. """
  144. 获取mongo数据库连接
  145. :param host:
  146. :param port:
  147. :param database:
  148. :param collection:
  149. :param authdb:
  150. :param authuser:
  151. :param authpass:
  152. :return:
  153. """
  154. if database is None:
  155. raise RuntimeError('database is None')
  156. if collection is None:
  157. raise RuntimeError('collection is None')
  158. con = MongoClient(host, port, unicode_decode_error_handler="ignore")
  159. # print(con)
  160. if authdb is not None:
  161. db_auth = con[authdb]
  162. db_auth.authenticate(authuser, authpass)
  163. db = con[database]
  164. collection = db[collection]
  165. return collection
  166. @staticmethod
  167. #随机获取id
  168. def get_id_sample(con):
  169. id_list = []
  170. for item in con.aggregate([{"$match":{"repeat":0}},{'$sample': {'size': 100}}, {"$project": {"_id": 1}}]):
  171. id_list.append(str(item["_id"]))
  172. return id_list
  173. @staticmethod
  174. #获取id
  175. def get_id_mongo(con):
  176. id_list = []
  177. @staticmethod
  178. #根据ids,从数据库获取数据
  179. def data_ids_mongo(ids,con,save_con):
  180. id_list = ids
  181. for id in id_list:
  182. query = {"_id": ObjectId(id)}
  183. list_item = list(con.find(query))
  184. if list_item:
  185. Data_save.save(save_con,list_item[0])
  186. class Data_save():
  187. @staticmethod
  188. def save_con(host="192.168.3.167", port=27080, database=None, collection=None, authdb=None, authuser=None,
  189. authpass=None):
  190. """
  191. 获取mongo数据库连接
  192. :param host:
  193. :param port:
  194. :param database:
  195. :param collection:
  196. :param authdb:
  197. :param authuser:
  198. :param authpass:
  199. :return:
  200. """
  201. if database is None:
  202. raise RuntimeError('database is None')
  203. if collection is None:
  204. raise RuntimeError('collection is None')
  205. con = MongoClient(host, port, unicode_decode_error_handler="ignore")
  206. # print(con)
  207. if authdb is not None:
  208. db_auth = con[authdb]
  209. db_auth.authenticate(authuser, authpass)
  210. db = con[database]
  211. collection = db[collection]
  212. return collection
  213. @staticmethod
  214. def save(con, obj):
  215. """
  216. 插入数据
  217. :param coon:
  218. :param obj:
  219. :return:
  220. """
  221. con.save(obj)
  222. @staticmethod
  223. def insert_many(con, bulk):
  224. """
  225. 批量插入
  226. :param con:
  227. :param bulk:
  228. :return:
  229. """
  230. con.insert_many(bulk)