result_export.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. # 导入所需库
  2. import pandas as pd
  3. from pymongo import MongoClient
  4. from openpyxl import load_workbook
  5. # MongoDB连接配置
  6. host = '192.168.3.206' # MongoDB主机地址
  7. port = 27080 # MongoDB端口
  8. dbname = 'data_quality' # 数据库名称
  9. collection_name = 'bidding_20231228' # 集合名称
  10. # 创建MongoDB连接
  11. client = MongoClient(host, port)
  12. db = client[dbname]
  13. collection = db[collection_name]
  14. # 从MongoDB读取数据
  15. data = pd.DataFrame(list(collection.find()))
  16. # 定义字段中英文映射
  17. column_name_mapping = {
  18. "area_qa": "省份",
  19. "bidamount_qa": "中标金额",
  20. "budget_qa": "预算",
  21. "buyer_qa": "采购单位",
  22. "multipackage_qa": "分包",
  23. "projectcode_qa": "项目编号",
  24. "projectname_qa": "项目名称",
  25. "title_qa": "标题",
  26. "winner_qa": "中标单位",
  27. "score": "标讯总分数",
  28. "bidopentime_qa": "开标时间",
  29. "publishtime_qa": "发布时间",
  30. "toptype_qa": "信息一级分类",
  31. "subtype_qa": "信息二级分类"
  32. }
  33. # 将字段名称更改为中文
  34. data.rename(columns=column_name_mapping, inplace=True)
  35. # 打印语句来检查 '一级分类' 和 '二级分类' 字段的值
  36. print("一级分类字段值分布:")
  37. print(data['信息一级分类'].value_counts(dropna=False))
  38. print("\n二级分类字段值分布:")
  39. print(data['信息二级分类'].value_counts(dropna=False))
  40. # 关闭MongoDB连接
  41. client.close()
  42. # analyze_column 函数,处理 NaN 值
  43. def analyze_column(dataframe, column_name):
  44. if column_name not in dataframe.columns:
  45. # 字段不存在时,认为所有记录都是正确的
  46. total = len(dataframe)
  47. correct = total
  48. error = 0
  49. else:
  50. # 对于存在的字段,NaN 和空字典 {} 视为正确,其他视为错误
  51. total = len(dataframe[column_name])
  52. correct = dataframe[column_name].apply(lambda x: pd.isna(x) or x == {}).sum()
  53. error = total - correct
  54. accuracy = correct / total if total > 0 else 0
  55. error_rate = error / total if total > 0 else 0
  56. # 收集错误原因
  57. error_reasons = dataframe[column_name].apply(
  58. lambda x: x if x != {} and not pd.isna(x) else None).dropna().value_counts()
  59. return total, correct, error, accuracy, error_rate, error_reasons
  60. # 重新格式化错误原因的数据结构
  61. def reformat_error_reasons_safe(error_reasons_series):
  62. # 初始化一个空字典,用于存储重新格式化的错误原因
  63. reformatted_reasons = {}
  64. # 遍历错误原因字典及其对应的次数
  65. for error_dict, count in error_reasons_series.items():
  66. if isinstance(error_dict, dict): # 如果是字典类型的错误原因
  67. for error_code, reason in error_dict.items():
  68. # 检查原因字符串是否包含逗号
  69. if ',' in reason:
  70. parts = reason.split(',')
  71. formatted_reason = parts[1].strip()
  72. else:
  73. formatted_reason = reason.strip()
  74. # 如果格式化后的原因非空,则构建键值对并更新字典
  75. if formatted_reason:
  76. key = (formatted_reason,)
  77. if key not in reformatted_reasons:
  78. reformatted_reasons[key] = count
  79. else:
  80. reformatted_reasons[key] += count
  81. elif isinstance(error_dict, list): # 如果是列表类型的错误原因
  82. key = (tuple(error_dict),) if error_dict else None
  83. if key not in reformatted_reasons:
  84. reformatted_reasons[key] = count
  85. else:
  86. reformatted_reasons[key] += count
  87. else: # 其他类型的错误原因
  88. key = (error_dict,) if error_dict else None
  89. if key not in reformatted_reasons:
  90. reformatted_reasons[key] = count
  91. else:
  92. reformatted_reasons[key] += count
  93. # 构建最终格式化后的结果字典,去除空键和空字符串键
  94. formatted_results = {
  95. str(key[0]): value for key, value in reformatted_reasons.items() if key and key[0] != ''
  96. }
  97. return formatted_results
  98. # 对每个字段进行分析
  99. fields_to_analyze = ["省份", "中标金额", "预算", "采购单位", "分包", "项目编号", "项目名称", "标题", "中标单位",
  100. "开标时间", "发布时间", "信息一级分类", "信息二级分类"]
  101. expanded_analysis_results = []
  102. for col in fields_to_analyze:
  103. total, correct, error, accuracy, error_rate, error_reasons = analyze_column(data, col)
  104. chinese_name = column_name_mapping.get(col, col)
  105. reformatted_error_reasons = reformat_error_reasons_safe(error_reasons)
  106. for reason, count in reformatted_error_reasons.items():
  107. reason = str(reason).replace('(', '').replace(',)', '').replace("'", '')
  108. expanded_analysis_results.append({
  109. '字段': chinese_name,
  110. '总量': total,
  111. '正确数量': correct,
  112. '错误数量': error,
  113. '正确率': f'{accuracy:.2%}',
  114. '错误率': f'{error_rate:.2%}',
  115. '错误原因': reason,
  116. '错误次数': count
  117. })
  118. # 创建DataFrame并导出为Excel
  119. expanded_analysis_results_df = pd.DataFrame(expanded_analysis_results)
  120. # "标讯总分数" 字段的分布
  121. if "标讯总分数" in data.columns:
  122. # 转换为浮点数
  123. data['标讯总分数'] = data['标讯总分数'].astype(float)
  124. score_counts = data['标讯总分数'].value_counts().sort_index()
  125. total_scores = len(data['标讯总分数'])
  126. score_percentages = (score_counts / total_scores) * 100
  127. score_distribution_df = pd.DataFrame({
  128. '标讯总分数': score_counts.index,
  129. '数量': score_counts.values,
  130. '百分比': score_percentages.values
  131. })
  132. # 百分比格式化为字符串,并附加百分号
  133. score_distribution_df['百分比'] = score_distribution_df['百分比'].apply(lambda x: f'{x:.2f}%')
  134. # "purchasinglist" 下的 "score" 字段的分布
  135. if 'purchasinglist' in data.columns:
  136. # 提取 "score" 并转换为浮点数
  137. purchasinglist_scores = data['purchasinglist'].map(
  138. lambda x: float(x[0]['score']) if isinstance(x, list) and x and isinstance(x[0], dict) and 'score' in x[
  139. 0] else 0
  140. )
  141. purchasinglist_score_counts = purchasinglist_scores.value_counts().sort_index()
  142. purchasinglist_total_scores = purchasinglist_scores.notnull().sum()
  143. purchasinglist_score_percentages = (purchasinglist_score_counts / purchasinglist_total_scores) * 100
  144. purchasinglist_score_distribution_df = pd.DataFrame({
  145. '标的物分数': purchasinglist_score_counts.index,
  146. '数量': purchasinglist_score_counts.values,
  147. '百分比': purchasinglist_score_percentages.values
  148. })
  149. # 百分比格式化为字符串,并附加百分号
  150. purchasinglist_score_distribution_df['百分比'] = purchasinglist_score_distribution_df['百分比'].apply(
  151. lambda x: f'{x:.2f}%')
  152. # 对错误次数进行倒序排序
  153. expanded_analysis_results_df = expanded_analysis_results_df.sort_values(by='错误次数', ascending=False)
  154. # 使用 pd.ExcelWriter 进行写入操作
  155. with pd.ExcelWriter('临时文件.xlsx', engine='openpyxl') as writer:
  156. # 新建一个工作表 "分数分析结果"
  157. writer.sheets['分数分析结果'] = writer.book.create_sheet('分数分析结果')
  158. if "标讯总分数" in data.columns:
  159. # 添加总量列
  160. score_distribution_df['总量'] = total_scores
  161. # 对分数进行倒序排序
  162. score_distribution_df = score_distribution_df.sort_values(by='标讯总分数', ascending=False)
  163. score_distribution_df.to_excel(writer, sheet_name='分数分析结果', index=False)
  164. # 新建一列写入 "purchasinglist" 下的 "score" 分布
  165. if 'purchasinglist' in data.columns and purchasinglist_scores.notnull().any():
  166. # 注意这里的startcol参数,它应该基于您的实际数据列数来设置
  167. purchasinglist_score_distribution_df = purchasinglist_score_distribution_df.sort_values(by='标的物分数',
  168. ascending=False)
  169. purchasinglist_score_distribution_df.to_excel(writer, sheet_name='分数分析结果',
  170. startcol=len(score_distribution_df.columns) + 2, index=False)
  171. # 添加总量列
  172. purchasinglist_score_distribution_df['总量'] = purchasinglist_total_scores
  173. purchasinglist_score_distribution_df.to_excel(writer, sheet_name='分数分析结果',
  174. startcol=len(score_distribution_df.columns) + 2, index=False)
  175. expanded_analysis_results_df.to_excel(writer, sheet_name='字段分析结果', index=False)
  176. # 假设您的分析结果已经保存在一个临时文件中
  177. temp_analysis_file = '临时文件.xlsx' # 临时文件的路径
  178. # 加载您想要合并结果到的Excel文件
  179. modified_file_path = 'pin.xlsx' #拼接文件路径
  180. wb = load_workbook(modified_file_path)
  181. # 加载包含分析结果的临时Excel文件
  182. temp_wb = load_workbook(temp_analysis_file)
  183. # 将临时文件中的工作表复制到修改过的文件中
  184. for sheet_name in temp_wb.sheetnames:
  185. source = temp_wb[sheet_name]
  186. target = wb.create_sheet(sheet_name)
  187. for row in source.iter_rows(min_row=1, max_col=source.max_column, max_row=source.max_row, values_only=True):
  188. target.append(row)
  189. # 保存最终的合并文件
  190. final_merged_file_path = '质量分析报告.xlsx' # 最终合并文件的路径
  191. wb.save(final_merged_file_path)