sample_data_export_online.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. from pymongo import MongoClient
  2. from urllib.parse import quote_plus
  3. def sample_data(N):
  4. username = "liumiaomiao"
  5. password = "Lmm@80923"
  6. host = "127.0.0.1" # 例如: localhost 或 192.168.1.100
  7. port = "27088" # 默认MongoDB端口
  8. escaped_username = quote_plus(username)
  9. escaped_password = quote_plus(password)
  10. # 构建连接URI
  11. mongo_uri = f"mongodb://{escaped_username}:{escaped_password}@{host}:{port}/"
  12. # 连接MongoDB
  13. db = MongoClient(mongo_uri, unicode_decode_error_handler="ignore", directConnection=True).qfw # bidding库
  14. # db = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality
  15. # db = MongoClient('mongodb://127.0.0.1:27088/', unicode_decode_error_handler="ignore",directConnection=True).qfw # 清洗库
  16. coll_user = db["bidding_master_20250530"]
  17. # filter_condition = {
  18. # "$or": [
  19. # {"tag": 1},
  20. # {"tag": 2}
  21. # ]
  22. # }
  23. filter_condition = {
  24. "$and": [
  25. {"old_id": {"$exists": True}},
  26. {"$expr": {"$ne": ["$_id", "$old_id"]}},
  27. {"$expr": {"$ne": ["$prefer_score", "$old_prefer_score"]}}
  28. ]
  29. }
  30. # 获取所有站点及其文档数
  31. site_list = {}
  32. site_count = coll_user.aggregate([
  33. {"$match": filter_condition},
  34. {"$group": {"_id": "$site", "count": {"$sum": 1}}},
  35. {"$sort": {"count": -1}}
  36. ])
  37. for item in site_count:
  38. site_list[item["_id"]] = item["count"]
  39. total_docs = sum(site_list.values())
  40. remaining = N
  41. marked_count = 0
  42. for site, count in site_list.items():
  43. if remaining <= 0:
  44. break
  45. # 计算该站点应分配的样本数
  46. num = max(1, round(N * count / total_docs))
  47. num = min(num, remaining)
  48. print(f"Processing site: {site} - Allocating {num} samples")
  49. # 使用随机抽样
  50. pipeline = [
  51. {"$match": {"site": site, **filter_condition}},
  52. {"$match": {"site": site}},
  53. {"$sample": {"size": num}},
  54. {"$project": {"_id": 1}}
  55. ]
  56. sampled_ids = [doc["_id"] for doc in coll_user.aggregate(pipeline)]
  57. if not sampled_ids:
  58. continue
  59. update_result = coll_user.update_many(
  60. {"_id": {"$in": sampled_ids}},
  61. {"$set": {"mark": 1}}
  62. )
  63. marked_count += update_result.modified_count
  64. remaining -= update_result.modified_count
  65. print(f"Total marked documents: {marked_count}")
  66. sample_data(100)