sample_data_export.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. from pymongo import MongoClient
  2. from urllib.parse import quote_plus # 可选,若密码中有特殊字符
  3. # MongodbConfigSource = {
  4. # "ip_port": "127.0.0.1:27088",
  5. # "user": "viewdata",
  6. # "password": "viewdata",
  7. # "db": "qfw",
  8. # "col": "zktest_0422_fenbao"
  9. # }
  10. def sample_data(N):
  11. # 连接MongoDB数据库
  12. db = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality
  13. coll_user = db["zktest_0422_fenbao"]
  14. # 构建连接字符串(含特殊字符建议用 quote_plus 编码)
  15. # user = quote_plus(MongodbConfigSource['user'])
  16. # password = quote_plus(MongodbConfigSource['password'])
  17. # mongo_uri = f"mongodb://{user}:{password}@{MongodbConfigSource['ip_port']}/?authSource=admin"
  18. # client = MongoClient(mongo_uri, unicode_decode_error_handler="ignore")
  19. # # 获取数据库与集合
  20. # db = client[MongodbConfigSource["db"]]
  21. # coll_user = db[MongodbConfigSource["col"]]
  22. # 统计符合筛选条件的总数据量
  23. filter_condition = {
  24. "$or": [
  25. {"toptype": "招标"},
  26. {"subtype": {"$in": ["中标", "成交", "合同", "验收"]}}
  27. ]
  28. }
  29. count_all = coll_user.count_documents(filter_condition)
  30. print("Filtered Document Count:", count_all)
  31. # 统计总的数据量
  32. # count_all = coll_user.estimated_document_count()
  33. # # count_all = coll_user.count_documents({"tag": 1})
  34. # print("Total Document Count:", count_all)
  35. # 把符合条件的站点名称存起来
  36. site_list = {}
  37. n = 0
  38. site_count = coll_user.aggregate([
  39. {"$group": {"_id": "$site", "count": {"$sum": 1}}},
  40. {"$sort": {"count": -1}}])
  41. for item in site_count:
  42. if (n / count_all) <= 0.95:
  43. n += item["count"]
  44. site_list[item["_id"]] = item["count"]
  45. # 计算每个站点相对于N的目标抽取数量的总和
  46. total_ratio = sum([min(site_list[key] / count_all, 1) for key in site_list])
  47. # 初始化已标记的文档数量
  48. marked_count = 0
  49. # 选取每个站点数据量
  50. for key in site_list:
  51. if marked_count >= N:
  52. break # 如果已经达到或超过目标数量,停止处理
  53. # 计算每个站点的目标比例
  54. target_ratio = min(site_list[key] / count_all, 1) / total_ratio
  55. # 计算每个站点应该抽取的文档数量,确保至少为1
  56. num = max(int(target_ratio * N), 2)
  57. # 如果加上这个站点的数量会超过总目标,调整数量
  58. num = min(num, N - marked_count)
  59. print(f"{key} - Count: {site_list[key]}, Num: {num}, Ratio: {target_ratio}")
  60. # 计算每次抽取的间隔
  61. jiange = int(site_list[key] / num)
  62. query = {
  63. "$or": [
  64. {"toptype": "招标"},
  65. {"subtype": {"$in": ["中标", "成交", "合同", "验收"]}}
  66. ]
  67. }
  68. # 从每个站点等间隔地取数据
  69. for i in range(num):
  70. if marked_count >= N:
  71. break # 再次检查是否已达到目标数量
  72. for info in coll_user.find(query).sort("_id", -1).skip(i*jiange).limit(1):
  73. print(f"Updating document with _id: {info['_id']}")
  74. # 更新文档,设置标记
  75. update_result = coll_user.update_one({"_id": info["_id"]}, {"$set": {"flag": 1}})
  76. if update_result.modified_count == 0:
  77. print("No document updated for _id:", info["_id"])
  78. else:
  79. print("Document updated successfully for _id:", info["_id"])
  80. marked_count += 1
  81. if marked_count >= N:
  82. break # 再次检查是否已达到目标数量
  83. print(f"Total marked documents: {marked_count}")
  84. sample_data(500)