# 导入所需库 import pandas as pd from pymongo import MongoClient from openpyxl import load_workbook # MongoDB连接配置 host = '192.168.3.206' # MongoDB主机地址 port = 27080 # MongoDB端口 dbname = 'data_quality' # 数据库名称 collection_name = 'bidding_20231228' # 集合名称 # 创建MongoDB连接 client = MongoClient(host, port) db = client[dbname] collection = db[collection_name] # 从MongoDB读取数据 data = pd.DataFrame(list(collection.find())) # 定义字段中英文映射 column_name_mapping = { "area_qa": "省份", "bidamount_qa": "中标金额", "budget_qa": "预算", "buyer_qa": "采购单位", "multipackage_qa": "分包", "projectcode_qa": "项目编号", "projectname_qa": "项目名称", "title_qa": "标题", "winner_qa": "中标单位", "score": "标讯总分数", "bidopentime_qa": "开标时间", "publishtime_qa": "发布时间", "toptype_qa": "信息一级分类", "subtype_qa": "信息二级分类" } # 将字段名称更改为中文 data.rename(columns=column_name_mapping, inplace=True) # 打印语句来检查 '一级分类' 和 '二级分类' 字段的值 print("一级分类字段值分布:") print(data['信息一级分类'].value_counts(dropna=False)) print("\n二级分类字段值分布:") print(data['信息二级分类'].value_counts(dropna=False)) # 关闭MongoDB连接 client.close() # analyze_column 函数,处理 NaN 值 def analyze_column(dataframe, column_name): if column_name not in dataframe.columns: # 字段不存在时,认为所有记录都是正确的 total = len(dataframe) correct = total error = 0 else: # 对于存在的字段,NaN 和空字典 {} 视为正确,其他视为错误 total = len(dataframe[column_name]) correct = dataframe[column_name].apply(lambda x: pd.isna(x) or x == {}).sum() error = total - correct accuracy = correct / total if total > 0 else 0 error_rate = error / total if total > 0 else 0 # 收集错误原因 error_reasons = dataframe[column_name].apply( lambda x: x if x != {} and not pd.isna(x) else None).dropna().value_counts() return total, correct, error, accuracy, error_rate, error_reasons # 重新格式化错误原因的数据结构 def reformat_error_reasons_safe(error_reasons_series): # 初始化一个空字典,用于存储重新格式化的错误原因 reformatted_reasons = {} # 遍历错误原因字典及其对应的次数 for error_dict, count in error_reasons_series.items(): if isinstance(error_dict, dict): # 如果是字典类型的错误原因 for error_code, reason in error_dict.items(): # 检查原因字符串是否包含逗号 if ',' in reason: parts = reason.split(',') formatted_reason = parts[1].strip() else: formatted_reason = reason.strip() # 如果格式化后的原因非空,则构建键值对并更新字典 if formatted_reason: key = (formatted_reason,) if key not in reformatted_reasons: reformatted_reasons[key] = count else: reformatted_reasons[key] += count elif isinstance(error_dict, list): # 如果是列表类型的错误原因 key = (tuple(error_dict),) if error_dict else None if key not in reformatted_reasons: reformatted_reasons[key] = count else: reformatted_reasons[key] += count else: # 其他类型的错误原因 key = (error_dict,) if error_dict else None if key not in reformatted_reasons: reformatted_reasons[key] = count else: reformatted_reasons[key] += count # 构建最终格式化后的结果字典,去除空键和空字符串键 formatted_results = { str(key[0]): value for key, value in reformatted_reasons.items() if key and key[0] != '' } return formatted_results # 对每个字段进行分析 fields_to_analyze = ["省份", "中标金额", "预算", "采购单位", "分包", "项目编号", "项目名称", "标题", "中标单位", "开标时间", "发布时间", "信息一级分类", "信息二级分类"] expanded_analysis_results = [] for col in fields_to_analyze: total, correct, error, accuracy, error_rate, error_reasons = analyze_column(data, col) chinese_name = column_name_mapping.get(col, col) reformatted_error_reasons = reformat_error_reasons_safe(error_reasons) for reason, count in reformatted_error_reasons.items(): reason = str(reason).replace('(', '').replace(',)', '').replace("'", '') expanded_analysis_results.append({ '字段': chinese_name, '总量': total, '正确数量': correct, '错误数量': error, '正确率': f'{accuracy:.2%}', '错误率': f'{error_rate:.2%}', '错误原因': reason, '错误次数': count }) # 创建DataFrame并导出为Excel expanded_analysis_results_df = pd.DataFrame(expanded_analysis_results) # "标讯总分数" 字段的分布 if "标讯总分数" in data.columns: # 转换为浮点数 data['标讯总分数'] = data['标讯总分数'].astype(float) score_counts = data['标讯总分数'].value_counts().sort_index() total_scores = len(data['标讯总分数']) score_percentages = (score_counts / total_scores) * 100 score_distribution_df = pd.DataFrame({ '标讯总分数': score_counts.index, '数量': score_counts.values, '百分比': score_percentages.values }) # 百分比格式化为字符串,并附加百分号 score_distribution_df['百分比'] = score_distribution_df['百分比'].apply(lambda x: f'{x:.2f}%') # "purchasinglist" 下的 "score" 字段的分布 if 'purchasinglist' in data.columns: # 提取 "score" 并转换为浮点数 purchasinglist_scores = data['purchasinglist'].map( lambda x: float(x[0]['score']) if isinstance(x, list) and x and isinstance(x[0], dict) and 'score' in x[ 0] else 0 ) purchasinglist_score_counts = purchasinglist_scores.value_counts().sort_index() purchasinglist_total_scores = purchasinglist_scores.notnull().sum() purchasinglist_score_percentages = (purchasinglist_score_counts / purchasinglist_total_scores) * 100 purchasinglist_score_distribution_df = pd.DataFrame({ '标的物分数': purchasinglist_score_counts.index, '数量': purchasinglist_score_counts.values, '百分比': purchasinglist_score_percentages.values }) # 百分比格式化为字符串,并附加百分号 purchasinglist_score_distribution_df['百分比'] = purchasinglist_score_distribution_df['百分比'].apply( lambda x: f'{x:.2f}%') # 对错误次数进行倒序排序 expanded_analysis_results_df = expanded_analysis_results_df.sort_values(by='错误次数', ascending=False) # 使用 pd.ExcelWriter 进行写入操作 with pd.ExcelWriter('临时文件.xlsx', engine='openpyxl') as writer: # 新建一个工作表 "分数分析结果" writer.sheets['分数分析结果'] = writer.book.create_sheet('分数分析结果') if "标讯总分数" in data.columns: # 添加总量列 score_distribution_df['总量'] = total_scores # 对分数进行倒序排序 score_distribution_df = score_distribution_df.sort_values(by='标讯总分数', ascending=False) score_distribution_df.to_excel(writer, sheet_name='分数分析结果', index=False) # 新建一列写入 "purchasinglist" 下的 "score" 分布 if 'purchasinglist' in data.columns and purchasinglist_scores.notnull().any(): # 注意这里的startcol参数,它应该基于您的实际数据列数来设置 purchasinglist_score_distribution_df = purchasinglist_score_distribution_df.sort_values(by='标的物分数', ascending=False) purchasinglist_score_distribution_df.to_excel(writer, sheet_name='分数分析结果', startcol=len(score_distribution_df.columns) + 2, index=False) # 添加总量列 purchasinglist_score_distribution_df['总量'] = purchasinglist_total_scores purchasinglist_score_distribution_df.to_excel(writer, sheet_name='分数分析结果', startcol=len(score_distribution_df.columns) + 2, index=False) expanded_analysis_results_df.to_excel(writer, sheet_name='字段分析结果', index=False) # 假设您的分析结果已经保存在一个临时文件中 temp_analysis_file = '临时文件.xlsx' # 临时文件的路径 # 加载您想要合并结果到的Excel文件 modified_file_path = 'pin.xlsx' #拼接文件路径 wb = load_workbook(modified_file_path) # 加载包含分析结果的临时Excel文件 temp_wb = load_workbook(temp_analysis_file) # 将临时文件中的工作表复制到修改过的文件中 for sheet_name in temp_wb.sheetnames: source = temp_wb[sheet_name] target = wb.create_sheet(sheet_name) for row in source.iter_rows(min_row=1, max_col=source.max_column, max_row=source.max_row, values_only=True): target.append(row) # 保存最终的合并文件 final_merged_file_path = '质量分析报告.xlsx' # 最终合并文件的路径 wb.save(final_merged_file_path)