from pymongo import MongoClient from urllib.parse import quote_plus def sample_data(N): username = "liumiaomiao" password = "Lmm@80923" host = "127.0.0.1" # 例如: localhost 或 192.168.1.100 port = "27088" # 默认MongoDB端口 escaped_username = quote_plus(username) escaped_password = quote_plus(password) # 构建连接URI mongo_uri = f"mongodb://{escaped_username}:{escaped_password}@{host}:{port}/" # 连接MongoDB db = MongoClient(mongo_uri, unicode_decode_error_handler="ignore", directConnection=True).qfw # bidding库 # db = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality # db = MongoClient('mongodb://127.0.0.1:27088/', unicode_decode_error_handler="ignore",directConnection=True).qfw # 清洗库 coll_user = db["bidding_master_20250530"] # filter_condition = { # "$or": [ # {"tag": 1}, # {"tag": 2} # ] # } filter_condition = { "$and": [ {"old_id": {"$exists": True}}, {"$expr": {"$ne": ["$_id", "$old_id"]}}, {"$expr": {"$ne": ["$prefer_score", "$old_prefer_score"]}} ] } # 获取所有站点及其文档数 site_list = {} site_count = coll_user.aggregate([ {"$match": filter_condition}, {"$group": {"_id": "$site", "count": {"$sum": 1}}}, {"$sort": {"count": -1}} ]) for item in site_count: site_list[item["_id"]] = item["count"] total_docs = sum(site_list.values()) remaining = N marked_count = 0 for site, count in site_list.items(): if remaining <= 0: break # 计算该站点应分配的样本数 num = max(1, round(N * count / total_docs)) num = min(num, remaining) print(f"Processing site: {site} - Allocating {num} samples") # 使用随机抽样 pipeline = [ {"$match": {"site": site, **filter_condition}}, {"$match": {"site": site}}, {"$sample": {"size": num}}, {"$project": {"_id": 1}} ] sampled_ids = [doc["_id"] for doc in coll_user.aggregate(pipeline)] if not sampled_ids: continue update_result = coll_user.update_many( {"_id": {"$in": sampled_ids}}, {"$set": {"mark": 1}} ) marked_count += update_result.modified_count remaining -= update_result.modified_count print(f"Total marked documents: {marked_count}") sample_data(100)