from pymongo import MongoClient from urllib.parse import quote_plus # 可选,若密码中有特殊字符 # MongodbConfigSource = { # "ip_port": "127.0.0.1:27088", # "user": "viewdata", # "password": "viewdata", # "db": "qfw", # "col": "zktest_0422_fenbao" # } def sample_data(N): # 连接MongoDB数据库 db = MongoClient('172.20.45.129', 27002, unicode_decode_error_handler="ignore").data_quality coll_user = db["zktest_0422_fenbao"] # 构建连接字符串(含特殊字符建议用 quote_plus 编码) # user = quote_plus(MongodbConfigSource['user']) # password = quote_plus(MongodbConfigSource['password']) # mongo_uri = f"mongodb://{user}:{password}@{MongodbConfigSource['ip_port']}/?authSource=admin" # client = MongoClient(mongo_uri, unicode_decode_error_handler="ignore") # # 获取数据库与集合 # db = client[MongodbConfigSource["db"]] # coll_user = db[MongodbConfigSource["col"]] # 统计符合筛选条件的总数据量 filter_condition = { "$or": [ {"toptype": "招标"}, {"subtype": {"$in": ["中标", "成交", "合同", "验收"]}} ] } count_all = coll_user.count_documents(filter_condition) print("Filtered Document Count:", count_all) # 统计总的数据量 # count_all = coll_user.estimated_document_count() # # count_all = coll_user.count_documents({"tag": 1}) # print("Total Document Count:", count_all) # 把符合条件的站点名称存起来 site_list = {} n = 0 site_count = coll_user.aggregate([ {"$group": {"_id": "$site", "count": {"$sum": 1}}}, {"$sort": {"count": -1}}]) for item in site_count: if (n / count_all) <= 0.95: n += item["count"] site_list[item["_id"]] = item["count"] # 计算每个站点相对于N的目标抽取数量的总和 total_ratio = sum([min(site_list[key] / count_all, 1) for key in site_list]) # 初始化已标记的文档数量 marked_count = 0 # 选取每个站点数据量 for key in site_list: if marked_count >= N: break # 如果已经达到或超过目标数量,停止处理 # 计算每个站点的目标比例 target_ratio = min(site_list[key] / count_all, 1) / total_ratio # 计算每个站点应该抽取的文档数量,确保至少为1 num = max(int(target_ratio * N), 2) # 如果加上这个站点的数量会超过总目标,调整数量 num = min(num, N - marked_count) print(f"{key} - Count: {site_list[key]}, Num: {num}, Ratio: {target_ratio}") # 计算每次抽取的间隔 jiange = int(site_list[key] / num) query = { "$or": [ {"toptype": "招标"}, {"subtype": {"$in": ["中标", "成交", "合同", "验收"]}} ] } # 从每个站点等间隔地取数据 for i in range(num): if marked_count >= N: break # 再次检查是否已达到目标数量 for info in coll_user.find(query).sort("_id", -1).skip(i*jiange).limit(1): print(f"Updating document with _id: {info['_id']}") # 更新文档,设置标记 update_result = coll_user.update_one({"_id": info["_id"]}, {"$set": {"flag": 1}}) if update_result.modified_count == 0: print("No document updated for _id:", info["_id"]) else: print("Document updated successfully for _id:", info["_id"]) marked_count += 1 if marked_count >= N: break # 再次检查是否已达到目标数量 print(f"Total marked documents: {marked_count}") sample_data(500)