sample_data_export_new.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. from pymongo import MongoClient
  2. def sample_data(N):
  3. db = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality
  4. # db = MongoClient('mongodb://127.0.0.1:27087/', unicode_decode_error_handler="ignore",directConnection=True).jyqyfw # 清洗库
  5. coll_user = db["missing_fid_attachments"]
  6. # filter_condition = {"tag_1": 1}
  7. # 获取所有站点及其文档数
  8. site_list = {}
  9. site_count = coll_user.aggregate([
  10. # {"$match": filter_condition},
  11. {"$group": {"_id": "$site", "count": {"$sum": 1}}},
  12. {"$sort": {"count": -1}}
  13. ])
  14. for item in site_count:
  15. site_list[item["_id"]] = item["count"]
  16. total_docs = sum(site_list.values())
  17. remaining = N
  18. marked_count = 0
  19. for site, count in site_list.items():
  20. if remaining <= 0:
  21. break
  22. # 计算该站点应分配的样本数
  23. num = max(1, round(N * count / total_docs))
  24. num = min(num, remaining)
  25. print(f"Processing site: {site} - Allocating {num} samples")
  26. # 使用随机抽样
  27. pipeline = [
  28. {"$match": {
  29. "site": site,
  30. # **filter_condition
  31. }
  32. },
  33. {"$match": {"site": site}},
  34. {"$sample": {"size": num}},
  35. {"$project": {"_id": 1}}
  36. ]
  37. sampled_ids = [doc["_id"] for doc in coll_user.aggregate(pipeline)]
  38. if not sampled_ids:
  39. continue
  40. update_result = coll_user.update_many(
  41. {"_id": {"$in": sampled_ids}},
  42. {"$set": {"flag": 1}}
  43. )
  44. marked_count += update_result.modified_count
  45. remaining -= update_result.modified_count
  46. print(f"Total marked documents: {marked_count}")
  47. sample_data(8000)