result_export_ai.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. from datetime import datetime
  2. import pandas as pd
  3. import numpy as np
  4. from pymongo import MongoClient
  5. from openpyxl import load_workbook
  6. #高质量字段以及错误原因
  7. # 将这个函数定义放在你的脚本顶部或其他函数定义附近
  8. def convert_numpy_int(obj):
  9. if isinstance(obj, np.int64):
  10. return int(obj)
  11. elif isinstance(obj, dict):
  12. return {key: convert_numpy_int(value) for key, value in obj.items()}
  13. elif isinstance(obj, list):
  14. return [convert_numpy_int(elem) for elem in obj]
  15. else:
  16. return obj
  17. # MongoDB连接配置
  18. host = '192.168.3.149' # MongoDB主机地址
  19. port = 27180 # MongoDB端口
  20. dbname = 'data_quality' # 数据库名称
  21. collection_name = 'bidding_20241219_ai' # 集合名称
  22. # 创建MongoDB连接
  23. client = MongoClient(host, port)
  24. db = client[dbname]
  25. collection = db[collection_name]
  26. # 定义字段中英文映射
  27. column_name_mapping = {
  28. "area_qa": "省份",
  29. "bidamount_qa": "中标金额",
  30. "budget_qa": "预算",
  31. "buyer_qa": "采购单位",
  32. "multipackage_qa": "分包",
  33. "projectcode_qa": "项目编号",
  34. "projectname_qa": "项目名称",
  35. "title_qa": "标题",
  36. "winner_qa": "中标单位",
  37. "score": "标讯总分数",
  38. "bidopentime_qa": "开标时间",
  39. "publishtime_qa": "发布时间",
  40. "toptype_qa": "信息一级分类",
  41. "subtype_qa": "信息二级分类"
  42. }
  43. # 从MongoDB读取数据
  44. data = pd.DataFrame(list(collection.find({},{k:1 for k,v in column_name_mapping.items()})))
  45. # 选择字段名以 '_qa' 结尾的列
  46. qa_columns = [col for col in data.columns if col.endswith('_qa')]
  47. # 仅保留 '_qa' 结尾的字段,并进行列名映射
  48. data = data[qa_columns]
  49. data.rename(columns=column_name_mapping, inplace=True)
  50. # 输出当前的数据列名
  51. print("当前的列名:")
  52. print(data.columns)
  53. # analyze_column 函数,处理 NaN 值
  54. def analyze_column(dataframe, column_name):
  55. if column_name not in dataframe.columns:
  56. total = len(dataframe)
  57. correct = total
  58. error = 0
  59. else:
  60. total = len(dataframe[column_name])
  61. correct = dataframe[column_name].apply(lambda x: pd.isna(x) or x == {}).sum()
  62. error = total - correct
  63. accuracy = correct / total if total > 0 else 0
  64. error_rate = error / total if total > 0 else 0
  65. # 收集错误原因
  66. error_reasons = dataframe[column_name].apply(
  67. lambda x: x if x != {} and not pd.isna(x) else None).dropna().value_counts()
  68. return total, correct, error, accuracy, error_rate, error_reasons
  69. # 重新格式化错误原因的数据结构
  70. def reformat_error_reasons_safe(error_reasons_series):
  71. reformatted_reasons = {}
  72. for error_dict, count in error_reasons_series.items():
  73. if isinstance(error_dict, dict): # 如果是字典类型的错误原因
  74. for error_code, reason in error_dict.items():
  75. if ',' in reason:
  76. parts = reason.split(',')
  77. formatted_reason = parts[1].strip()
  78. else:
  79. formatted_reason = reason.strip()
  80. if formatted_reason:
  81. key = (formatted_reason,)
  82. if key not in reformatted_reasons:
  83. reformatted_reasons[key] = count
  84. else:
  85. reformatted_reasons[key] += count
  86. elif isinstance(error_dict, list): # 如果是列表类型的错误原因
  87. key = (tuple(error_dict),) if error_dict else None
  88. if key not in reformatted_reasons:
  89. reformatted_reasons[key] = count
  90. else:
  91. reformatted_reasons[key] += count
  92. else: # 其他类型的错误原因
  93. key = (error_dict,) if error_dict else None
  94. if key not in reformatted_reasons:
  95. reformatted_reasons[key] = count
  96. else:
  97. reformatted_reasons[key] += count
  98. formatted_results = {
  99. str(key[0]): value for key, value in reformatted_reasons.items() if key and key[0] != ''
  100. }
  101. return formatted_results
  102. # 对每个字段进行分析
  103. fields_to_analyze = data.columns # 直接使用已选定的 '_qa' 字段
  104. expanded_analysis_results = []
  105. for col in fields_to_analyze:
  106. total, correct, error, accuracy, error_rate, error_reasons = analyze_column(data, col)
  107. reformatted_error_reasons = reformat_error_reasons_safe(error_reasons)
  108. for reason, count in reformatted_error_reasons.items():
  109. reason = str(reason).replace('(', '').replace(',)', '').replace("'", '')
  110. if error > 0:
  111. single_reason_error_rate = count / error
  112. else:
  113. single_reason_error_rate = 0 # 防止除以零的情况
  114. expanded_analysis_results.append({
  115. '字段': col,
  116. '总量': total,
  117. '正确数量': correct,
  118. '错误数量': error,
  119. '正确率': f'{accuracy:.2%}',
  120. '错误率': f'{error_rate:.2%}',
  121. '错误原因': reason,
  122. '错误次数': count,
  123. '单个原因错误率': f'{single_reason_error_rate:.2%}'
  124. })
  125. # 创建DataFrame并进行写入操作
  126. expanded_analysis_results_df = pd.DataFrame(expanded_analysis_results)
  127. # 使用 pd.ExcelWriter 进行写入操作
  128. with pd.ExcelWriter('临时文件.xlsx', engine='openpyxl') as writer:
  129. # 将分析结果写入Excel
  130. expanded_analysis_results_df.to_excel(writer, sheet_name='字段分析结果', index=False)
  131. # 假设您的分析结果已经保存在一个临时文件中
  132. temp_analysis_file = '临时文件.xlsx' # 临时文件的路径
  133. # 加载您想要合并结果到的Excel文件
  134. modified_file_path = 'pin.xlsx' # 拼接文件路径
  135. wb = load_workbook(modified_file_path)
  136. # 加载包含分析结果的临时Excel文件
  137. temp_wb = load_workbook(temp_analysis_file)
  138. # 将临时文件中的工作表复制到修改过的文件中
  139. for sheet_name in temp_wb.sheetnames:
  140. source = temp_wb[sheet_name]
  141. target = wb.create_sheet(sheet_name)
  142. for row in source.iter_rows(min_row=1, max_col=source.max_column, max_row=source.max_row, values_only=True):
  143. target.append(row)
  144. # 保存最终的合并文件
  145. final_merged_file_path = '质量分析报告.xlsx' # 最终合并文件的路径
  146. wb.save(final_merged_file_path)
  147. # 关闭MongoDB连接
  148. client.close()