from pymongo import MongoClient def sample_data(N): db = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality # db = MongoClient('mongodb://127.0.0.1:27087/', unicode_decode_error_handler="ignore",directConnection=True).jyqyfw # 清洗库 coll_user = db["missing_fid_attachments"] # filter_condition = {"tag_1": 1} # 获取所有站点及其文档数 site_list = {} site_count = coll_user.aggregate([ # {"$match": filter_condition}, {"$group": {"_id": "$site", "count": {"$sum": 1}}}, {"$sort": {"count": -1}} ]) for item in site_count: site_list[item["_id"]] = item["count"] total_docs = sum(site_list.values()) remaining = N marked_count = 0 for site, count in site_list.items(): if remaining <= 0: break # 计算该站点应分配的样本数 num = max(1, round(N * count / total_docs)) num = min(num, remaining) print(f"Processing site: {site} - Allocating {num} samples") # 使用随机抽样 pipeline = [ {"$match": { "site": site, # **filter_condition } }, {"$match": {"site": site}}, {"$sample": {"size": num}}, {"$project": {"_id": 1}} ] sampled_ids = [doc["_id"] for doc in coll_user.aggregate(pipeline)] if not sampled_ids: continue update_result = coll_user.update_many( {"_id": {"$in": sampled_ids}}, {"$set": {"flag": 1}} ) marked_count += update_result.modified_count remaining -= update_result.modified_count print(f"Total marked documents: {marked_count}") sample_data(8000)