|
@@ -0,0 +1,232 @@
|
|
|
+# 导入所需库
|
|
|
+import pandas as pd
|
|
|
+from pymongo import MongoClient
|
|
|
+from openpyxl import load_workbook
|
|
|
+# MongoDB连接配置
|
|
|
+host = '192.168.3.206' # MongoDB主机地址
|
|
|
+port = 27080 # MongoDB端口
|
|
|
+dbname = 'data_quality' # 数据库名称
|
|
|
+collection_name = 'bidding_20231122' # 集合名称
|
|
|
+
|
|
|
+# 创建MongoDB连接
|
|
|
+client = MongoClient(host, port)
|
|
|
+db = client[dbname]
|
|
|
+collection = db[collection_name]
|
|
|
+
|
|
|
+# 从MongoDB读取数据
|
|
|
+data = pd.DataFrame(list(collection.find().limit(1000)))
|
|
|
+
|
|
|
+# 定义字段中英文映射
|
|
|
+column_name_mapping = {
|
|
|
+ "area_qa": "省份",
|
|
|
+ "bidamount_qa": "中标金额",
|
|
|
+ "budget_qa": "预算",
|
|
|
+ "buyer_qa": "采购单位",
|
|
|
+ "multipackage_qa": "分包",
|
|
|
+ "projectcode_qa": "项目编号",
|
|
|
+ "projectname_qa": "项目名称",
|
|
|
+ "title_qa": "标题",
|
|
|
+ "winner_qa": "中标单位",
|
|
|
+ "score": "标讯总分数",
|
|
|
+ "bidopentime_qa": "开标时间",
|
|
|
+ "publishtime_qa": "发布时间",
|
|
|
+ "toptype_qa": "信息一级分类",
|
|
|
+ "subtype_qa": "信息二级分类"
|
|
|
+}
|
|
|
+
|
|
|
+# 将字段名称更改为中文
|
|
|
+data.rename(columns=column_name_mapping, inplace=True)
|
|
|
+# 打印语句来检查 '一级分类' 和 '二级分类' 字段的值
|
|
|
+print("一级分类字段值分布:")
|
|
|
+print(data['信息一级分类'].value_counts(dropna=False))
|
|
|
+print("\n二级分类字段值分布:")
|
|
|
+print(data['信息二级分类'].value_counts(dropna=False))
|
|
|
+# 关闭MongoDB连接
|
|
|
+client.close()
|
|
|
+# analyze_column 函数,处理 NaN 值
|
|
|
+def analyze_column(dataframe, column_name, special=False):
|
|
|
+ if column_name not in dataframe.columns:
|
|
|
+ # 如果字段不存在,假设所有记录都是正确的
|
|
|
+ total = len(dataframe)
|
|
|
+ correct = total
|
|
|
+ error = 0
|
|
|
+ accuracy = 1.0
|
|
|
+ error_rate = 0.0
|
|
|
+ error_reasons = pd.Series()
|
|
|
+ elif special:
|
|
|
+ # 特殊字段逻辑:存在且非空为错误
|
|
|
+ total = len(dataframe[column_name])
|
|
|
+ # 对于特殊字段,NaN 和空字典 {} 视为正确
|
|
|
+ correct = dataframe[column_name].apply(lambda x: pd.isna(x) or x == {}).sum()
|
|
|
+ error = total - correct
|
|
|
+ accuracy = correct / total
|
|
|
+ error_rate = error / total
|
|
|
+ error_reasons = dataframe[column_name].apply(
|
|
|
+ lambda x: x if x != {} and not pd.isna(x) else None).dropna().value_counts()
|
|
|
+ else:
|
|
|
+ # 常规字段逻辑
|
|
|
+ total = len(dataframe[column_name])
|
|
|
+ correct = dataframe[column_name].apply(lambda x: x == {}).sum()
|
|
|
+ error = total - correct
|
|
|
+ accuracy = correct / total
|
|
|
+ error_rate = error / total
|
|
|
+ error_reasons = dataframe[column_name].apply(lambda x: x if x != {} else None).dropna().value_counts()
|
|
|
+
|
|
|
+ return total, correct, error, accuracy, error_rate, error_reasons
|
|
|
+
|
|
|
+# 重新格式化错误原因的数据结构
|
|
|
+def reformat_error_reasons_safe(error_reasons_series):
|
|
|
+ # 初始化一个空字典,用于存储重新格式化的错误原因
|
|
|
+ reformatted_reasons = {}
|
|
|
+
|
|
|
+ # 遍历错误原因字典及其对应的次数
|
|
|
+ for error_dict, count in error_reasons_series.items():
|
|
|
+ if isinstance(error_dict, dict): # 如果是字典类型的错误原因
|
|
|
+ for error_code, reason in error_dict.items():
|
|
|
+ # 检查原因字符串是否包含逗号
|
|
|
+ if ',' in reason:
|
|
|
+ parts = reason.split(',')
|
|
|
+ formatted_reason = parts[1].strip()
|
|
|
+ else:
|
|
|
+ formatted_reason = reason.strip()
|
|
|
+
|
|
|
+ # 如果格式化后的原因非空,则构建键值对并更新字典
|
|
|
+ if formatted_reason:
|
|
|
+ key = (formatted_reason,)
|
|
|
+ if key not in reformatted_reasons:
|
|
|
+ reformatted_reasons[key] = count
|
|
|
+ else:
|
|
|
+ reformatted_reasons[key] += count
|
|
|
+ elif isinstance(error_dict, list): # 如果是列表类型的错误原因
|
|
|
+ key = (tuple(error_dict),) if error_dict else None
|
|
|
+ if key not in reformatted_reasons:
|
|
|
+ reformatted_reasons[key] = count
|
|
|
+ else:
|
|
|
+ reformatted_reasons[key] += count
|
|
|
+ else: # 其他类型的错误原因
|
|
|
+ key = (error_dict,) if error_dict else None
|
|
|
+ if key not in reformatted_reasons:
|
|
|
+ reformatted_reasons[key] = count
|
|
|
+ else:
|
|
|
+ reformatted_reasons[key] += count
|
|
|
+
|
|
|
+ # 构建最终格式化后的结果字典,去除空键和空字符串键
|
|
|
+ formatted_results = {
|
|
|
+ str(key[0]): value for key, value in reformatted_reasons.items() if key and key[0] != ''
|
|
|
+ }
|
|
|
+ return formatted_results
|
|
|
+
|
|
|
+
|
|
|
+# 对每个字段进行分析
|
|
|
+fields_to_analyze = ["省份", "中标金额", "预算", "采购单位", "分包", "项目编号", "项目名称", "标题", "中标单位",
|
|
|
+ "开标时间", "发布时间", "信息一级分类", "信息二级分类"]
|
|
|
+expanded_analysis_results = []
|
|
|
+
|
|
|
+for col in fields_to_analyze:
|
|
|
+ total, correct, error, accuracy, error_rate, error_reasons = analyze_column(data, col,col in ['信息一级分类', '信息二级分类'])
|
|
|
+
|
|
|
+ chinese_name = column_name_mapping.get(col, col)
|
|
|
+ reformatted_error_reasons = reformat_error_reasons_safe(error_reasons)
|
|
|
+
|
|
|
+ for reason, count in reformatted_error_reasons.items():
|
|
|
+ # 将错误原因转换为字符串,并去除括号和引号
|
|
|
+ reason = str(reason).replace('(', '').replace(',)', '').replace("'", '')
|
|
|
+ expanded_analysis_results.append({
|
|
|
+ '字段': chinese_name,
|
|
|
+ '总量': total,
|
|
|
+ '正确数量': correct,
|
|
|
+ '错误数量': error,
|
|
|
+ '正确率': f'{accuracy:.2%}',
|
|
|
+ '错误率': f'{error_rate:.2%}',
|
|
|
+ '错误原因': reason,
|
|
|
+ '错误次数': count
|
|
|
+ })
|
|
|
+
|
|
|
+# 创建DataFrame并导出为Excel
|
|
|
+expanded_analysis_results_df = pd.DataFrame(expanded_analysis_results)
|
|
|
+
|
|
|
+# "标讯总分数" 字段的分布
|
|
|
+if "标讯总分数" in data.columns:
|
|
|
+ # 转换为浮点数
|
|
|
+ data['标讯总分数'] = data['标讯总分数'].astype(float)
|
|
|
+ score_counts = data['标讯总分数'].value_counts().sort_index()
|
|
|
+ total_scores = len(data['标讯总分数'])
|
|
|
+ score_percentages = (score_counts / total_scores) * 100
|
|
|
+ score_distribution_df = pd.DataFrame({
|
|
|
+ '标讯总分数': score_counts.index,
|
|
|
+ '数量': score_counts.values,
|
|
|
+ '百分比': score_percentages.values
|
|
|
+ })
|
|
|
+ # 百分比格式化为字符串,并附加百分号
|
|
|
+ score_distribution_df['百分比'] = score_distribution_df['百分比'].apply(lambda x: f'{x:.2f}%')
|
|
|
+
|
|
|
+# "purchasinglist" 下的 "score" 字段的分布
|
|
|
+if 'purchasinglist' in data.columns:
|
|
|
+ # 提取 "score" 并转换为浮点数
|
|
|
+ purchasinglist_scores = data['purchasinglist'].map(
|
|
|
+ lambda x: float(x[0]['score']) if isinstance(x, list) and x and isinstance(x[0], dict) and 'score' in x[
|
|
|
+ 0] else 0
|
|
|
+ )
|
|
|
+
|
|
|
+ purchasinglist_score_counts = purchasinglist_scores.value_counts().sort_index()
|
|
|
+ purchasinglist_total_scores = purchasinglist_scores.notnull().sum()
|
|
|
+ purchasinglist_score_percentages = (purchasinglist_score_counts / purchasinglist_total_scores) * 100
|
|
|
+ purchasinglist_score_distribution_df = pd.DataFrame({
|
|
|
+ '标的物分数': purchasinglist_score_counts.index,
|
|
|
+ '数量': purchasinglist_score_counts.values,
|
|
|
+ '百分比': purchasinglist_score_percentages.values
|
|
|
+ })
|
|
|
+ # 百分比格式化为字符串,并附加百分号
|
|
|
+ purchasinglist_score_distribution_df['百分比'] = purchasinglist_score_distribution_df['百分比'].apply(
|
|
|
+ lambda x: f'{x:.2f}%')
|
|
|
+
|
|
|
+# 对错误次数进行倒序排序
|
|
|
+expanded_analysis_results_df = expanded_analysis_results_df.sort_values(by='错误次数', ascending=False)
|
|
|
+
|
|
|
+# 使用 pd.ExcelWriter 进行写入操作
|
|
|
+with pd.ExcelWriter('临时文件.xlsx', engine='openpyxl') as writer:
|
|
|
+ # 新建一个工作表 "分数分析结果"
|
|
|
+ writer.sheets['分数分析结果'] = writer.book.create_sheet('分数分析结果')
|
|
|
+ if "标讯总分数" in data.columns:
|
|
|
+ # 添加总量列
|
|
|
+ score_distribution_df['总量'] = total_scores
|
|
|
+ # 对分数进行倒序排序
|
|
|
+ score_distribution_df = score_distribution_df.sort_values(by='标讯总分数', ascending=False)
|
|
|
+ score_distribution_df.to_excel(writer, sheet_name='分数分析结果', index=False)
|
|
|
+
|
|
|
+ # 新建一列写入 "purchasinglist" 下的 "score" 分布
|
|
|
+ if 'purchasinglist' in data.columns and purchasinglist_scores.notnull().any():
|
|
|
+ # 注意这里的startcol参数,它应该基于您的实际数据列数来设置
|
|
|
+ purchasinglist_score_distribution_df = purchasinglist_score_distribution_df.sort_values(by='标的物分数',
|
|
|
+ ascending=False)
|
|
|
+ purchasinglist_score_distribution_df.to_excel(writer, sheet_name='分数分析结果',
|
|
|
+ startcol=len(score_distribution_df.columns) + 2, index=False)
|
|
|
+
|
|
|
+ # 添加总量列
|
|
|
+ purchasinglist_score_distribution_df['总量'] = purchasinglist_total_scores
|
|
|
+ purchasinglist_score_distribution_df.to_excel(writer, sheet_name='分数分析结果',
|
|
|
+ startcol=len(score_distribution_df.columns) + 2, index=False)
|
|
|
+
|
|
|
+ expanded_analysis_results_df.to_excel(writer, sheet_name='字段分析结果', index=False)
|
|
|
+
|
|
|
+# 假设您的分析结果已经保存在一个临时文件中
|
|
|
+temp_analysis_file = '临时文件.xlsx' # 临时文件的路径
|
|
|
+
|
|
|
+# 加载您想要合并结果到的Excel文件
|
|
|
+modified_file_path = 'pin.xlsx' #拼接文件路径
|
|
|
+wb = load_workbook(modified_file_path)
|
|
|
+
|
|
|
+# 加载包含分析结果的临时Excel文件
|
|
|
+temp_wb = load_workbook(temp_analysis_file)
|
|
|
+
|
|
|
+# 将临时文件中的工作表复制到修改过的文件中
|
|
|
+for sheet_name in temp_wb.sheetnames:
|
|
|
+ source = temp_wb[sheet_name]
|
|
|
+ target = wb.create_sheet(sheet_name)
|
|
|
+
|
|
|
+ for row in source.iter_rows(min_row=1, max_col=source.max_column, max_row=source.max_row, values_only=True):
|
|
|
+ target.append(row)
|
|
|
+
|
|
|
+# 保存最终的合并文件
|
|
|
+final_merged_file_path = '数据质量分析报告.xlsx' # 最终合并文件的路径
|
|
|
+wb.save(final_merged_file_path)
|