|
@@ -0,0 +1,289 @@
|
|
|
+import pymysql
|
|
|
+import pymongo
|
|
|
+import pandas as pd
|
|
|
+from openpyxl import Workbook
|
|
|
+from openpyxl.styles import Font, Alignment
|
|
|
+
|
|
|
+# # MySQL 配置信息
|
|
|
+# MYSQL_CONFIG = {
|
|
|
+# "host": "172.20.45.129",
|
|
|
+# "user": "root",
|
|
|
+# "password": "=PDT49#80Z!RVv52_z",
|
|
|
+# "database": "quality",
|
|
|
+# "port": 4000
|
|
|
+# }
|
|
|
+# # 连接 MySQL 并读取数据
|
|
|
+# def fetch_data():
|
|
|
+# conn = pymysql.connect(**MYSQL_CONFIG)
|
|
|
+# query = "SELECT * FROM sample_bid_analysis;"
|
|
|
+# df = pd.read_sql(query, conn)
|
|
|
+# conn.close()
|
|
|
+# return df
|
|
|
+# MongoDB 连接配置
|
|
|
+MONGO_CONFIG = {
|
|
|
+ "host": "172.20.45.129",
|
|
|
+ "port": 27002,
|
|
|
+ "db": "data_quality",
|
|
|
+ "col": "standard_sample_data_new",
|
|
|
+}
|
|
|
+# MySQL 配置
|
|
|
+MYSQL_CONFIG = {
|
|
|
+ "host": "172.20.45.129",
|
|
|
+ "user": "root",
|
|
|
+ "password": "=PDT49#80Z!RVv52_z",
|
|
|
+ "database": "quality",
|
|
|
+ "port": 4000
|
|
|
+}
|
|
|
+
|
|
|
+# 连接 MongoDB 并读取数据
|
|
|
+def fetch_data():
|
|
|
+ client = pymongo.MongoClient(f"mongodb://{MONGO_CONFIG['host']}:{MONGO_CONFIG['port']}")
|
|
|
+ db = client[MONGO_CONFIG["db"]]
|
|
|
+ collection = db[MONGO_CONFIG["col"]]
|
|
|
+
|
|
|
+ # 读取数据并转换为 DataFrame
|
|
|
+ data = list(collection.find({}, {"_id": 0})) # 去掉 `_id` 字段
|
|
|
+ df = pd.DataFrame(data)
|
|
|
+
|
|
|
+ client.close()
|
|
|
+ return df
|
|
|
+
|
|
|
+# 判断 projectname 是否互为包含关系
|
|
|
+def is_contained(str1, str2):
|
|
|
+ """ 判断 str1 和 str2 是否互相包含(非空值情况下) """
|
|
|
+ if pd.isna(str1) or pd.isna(str2): # 如果有 NaN 值,直接返回 False
|
|
|
+ return False
|
|
|
+ return str1 in str2 or str2 in str1 # 互为包含
|
|
|
+
|
|
|
+# 计算统计数据
|
|
|
+def calculate_metrics_and_accuracy(df, category):
|
|
|
+ """ 计算表格所需数据 """
|
|
|
+ # 确定数据类别:中标类 or 招标类
|
|
|
+ if category == "中标类":
|
|
|
+ bid_types = ["成交", "单一", "废标", "合同", "结果变更", "流标", "验收", "中标", "其它"]
|
|
|
+ df = df[df["subtype"].isin(bid_types)]
|
|
|
+ fields = ["toptype", "subtype", "area", "city", "buyer", "projectname", "projectcode", "budget", "s_winner", "bidamount"]
|
|
|
+
|
|
|
+ else: # 招标类
|
|
|
+ bid_types = ["成交", "单一", "废标", "合同", "结果变更", "流标", "验收", "中标", "其它", "拟建"]
|
|
|
+ df = df[~df["subtype"].isin(bid_types)]
|
|
|
+ fields = ["toptype", "subtype", "area", "city", "buyer", "projectname", "projectcode", "budget"]
|
|
|
+
|
|
|
+
|
|
|
+ results = []
|
|
|
+ # 统一将 None、<NA> 和空字符串都转为 pd.NA
|
|
|
+ df = df.replace({None: pd.NA, '': pd.NA}) # 替换 None 和空字符串为 pd.NA
|
|
|
+ df = df.fillna(pd.NA) # 确保所有空值都转为 pd.NA
|
|
|
+ correct_rows = 0 # 整行正确的计数
|
|
|
+ total_count = len(df) # 样本总量
|
|
|
+
|
|
|
+ for _, row in df.iterrows():
|
|
|
+ row_correct = True # 假设整行正确
|
|
|
+
|
|
|
+ for field in fields:
|
|
|
+ original_value = row.get(field, pd.NA)
|
|
|
+ ai_value = row.get(f"{field}_ai", pd.NA)
|
|
|
+
|
|
|
+ if field == "projectname": # 特殊处理 projectname
|
|
|
+ is_correct = is_contained(original_value, ai_value)
|
|
|
+ else:
|
|
|
+ # 这里避免 pd.NA 直接比较导致错误
|
|
|
+ if pd.isna(original_value) or pd.isna(ai_value):
|
|
|
+ is_correct = pd.isna(original_value) and pd.isna(ai_value) # 如果都为空,算正确
|
|
|
+ else:
|
|
|
+ is_correct = original_value == ai_value # 正常比较
|
|
|
+
|
|
|
+ if not is_correct:
|
|
|
+ row_correct = False # 只要有一个字段错误,整行就是错误的
|
|
|
+
|
|
|
+ if row_correct:
|
|
|
+ correct_rows += 1 # 统计整行正确的数量
|
|
|
+
|
|
|
+ # 计算整行正确率
|
|
|
+ single_row_accuracy = correct_rows / total_count if total_count else 0
|
|
|
+
|
|
|
+ for field in fields:
|
|
|
+ total_count = len(df) # 样本数据总量
|
|
|
+ null_count = df[field].isna().sum() # 原文无值
|
|
|
+ valid_count = total_count - null_count # 原文有值的数量
|
|
|
+
|
|
|
+ if field == "projectname": # 特殊处理 projectname
|
|
|
+ extract_correct_count = df.apply(lambda row: is_contained(row["projectname"], row["projectname_ai"]),axis=1).sum()
|
|
|
+ extract_error_count = valid_count - extract_correct_count
|
|
|
+ extract_correct_no_null = extract_correct_count # 互为包含的都算正确
|
|
|
+ extract_error_no_null = extract_error_count
|
|
|
+ else: # 其他字段的正常处理逻辑
|
|
|
+ extract_error_count = ((df[field].isna() & df[f"{field}_ai"].notna()) |
|
|
|
+ (df[field].notna() & df[f"{field}_ai"].isna()) |
|
|
|
+ (df[field].notna() & df[f"{field}_ai"].notna() & (
|
|
|
+ df[field] != df[f"{field}_ai"]))).sum()
|
|
|
+
|
|
|
+ # 抽取错误的数量(含原文无)
|
|
|
+ extract_correct_count = total_count - extract_error_count # 抽取正确的数量(含原文无)
|
|
|
+ extract_error_no_null = (df[field].notna() & (df[field] != df.get(f"{field}_ai", df[field]))).sum() # 抽取错误的数量(不含原文无)
|
|
|
+ extract_correct_no_null = valid_count - extract_error_no_null # 抽取有值且正确数量(不含原文无)
|
|
|
+
|
|
|
+ # 计算比率
|
|
|
+ recognition_rate = valid_count / total_count if total_count else 0 # 识别率
|
|
|
+ recognition_correct_rate = extract_correct_count / total_count if total_count else 0 # 识别正确率
|
|
|
+ correct_rate = extract_correct_no_null / valid_count if valid_count else 0 # 正确率(原文存在情况下)
|
|
|
+
|
|
|
+ results.append([
|
|
|
+ field, total_count, null_count, valid_count, extract_error_count,
|
|
|
+ extract_correct_count, extract_error_no_null, extract_correct_no_null,
|
|
|
+ f"{recognition_rate:.2%}", f"{recognition_correct_rate:.2%}", f"{correct_rate:.2%}"
|
|
|
+ ])
|
|
|
+ results.append({
|
|
|
+ "field_name": field,
|
|
|
+ "sample_total": total_count,
|
|
|
+ "original_null": null_count,
|
|
|
+ "original_exist": valid_count,
|
|
|
+ "extract_error_total": extract_error_count,
|
|
|
+ "extract_correct_total": extract_correct_count,
|
|
|
+ "extract_error_exist": extract_error_no_null,
|
|
|
+ "extract_correct_exist": extract_correct_no_null,
|
|
|
+ "recognition_rate": f"{recognition_rate:.2%}",
|
|
|
+ "correct_recognition_rate": f"{recognition_correct_rate:.2%}",
|
|
|
+ "accuracy_rate": f"{correct_rate:.2%}",
|
|
|
+ "data_type": category
|
|
|
+ })
|
|
|
+
|
|
|
+ columns = ["字段", "样本数据总量", "原文无值", "原文有值的数量", "抽取错误的数量(含原文无)",
|
|
|
+ "抽取正确的数量(含原文无)", "抽取错误的数量(不含原文无)",
|
|
|
+ "抽取有值且正确数量(不含原文无)", "识别率", "识别正确率", "正确率(原文存在情况下)"]
|
|
|
+ df_fields = pd.DataFrame(results, columns=columns)
|
|
|
+
|
|
|
+ # 整行统计数据
|
|
|
+ df_overall = pd.DataFrame([["数据总量", total_count],
|
|
|
+ ["整行都正确的数量", correct_rows],
|
|
|
+ ["单行正确率", f"{single_row_accuracy:.2%}"]],
|
|
|
+ columns=["指标", "数值"])
|
|
|
+
|
|
|
+ # 构建整体统计
|
|
|
+ overall_data = {
|
|
|
+ "total_data_count": total_count,
|
|
|
+ "correct_rows_count": correct_rows,
|
|
|
+ "row_accuracy": f"{correct_rows / total_count:.2%}" if total_count else "0.00%",
|
|
|
+ "data_type": category
|
|
|
+ }
|
|
|
+ return df_fields,df_overall,overall_data
|
|
|
+
|
|
|
+
|
|
|
+# # 计算整体正确率
|
|
|
+# def calculate_overall_accuracy(df, fields):
|
|
|
+# """ 计算整行正确的数量及单行正确率 """
|
|
|
+# total_count = len(df) # 样本总量
|
|
|
+#
|
|
|
+# # 判断每行所有字段是否都正确(projectname 需使用互为包含逻辑)
|
|
|
+# def is_row_correct(row):
|
|
|
+# for field in fields:
|
|
|
+# if pd.isna(row[field]) and pd.isna(row[f"{field}_ai"]): # 如果原值和 AI 值都为空,算正确
|
|
|
+# continue
|
|
|
+# if field == "projectname":
|
|
|
+# if not is_contained(row["projectname"], row["projectname_ai"]): # projectname 互为包含
|
|
|
+# return False
|
|
|
+# else:
|
|
|
+# if row[field] != row.get(f"{field}_ai", row[field]): # 其他字段直接对比
|
|
|
+# return False
|
|
|
+# return True
|
|
|
+#
|
|
|
+# correct_rows = df.apply(is_row_correct, axis=1).sum() # 统计整行正确的数量
|
|
|
+# single_row_accuracy = correct_rows / total_count if total_count else 0 # 计算单行正确率
|
|
|
+#
|
|
|
+# return pd.DataFrame([["数据总量", total_count],
|
|
|
+# ["整行都正确的数量", correct_rows],
|
|
|
+# ["单行正确率", f"{single_row_accuracy:.2%}"]],
|
|
|
+# columns=["指标", "数值"])
|
|
|
+
|
|
|
+# 导出 Excel
|
|
|
+def export_to_excel(df_bid_fields, df_bid_overall,df_tender_fields,df_tender_overall):
|
|
|
+ file_path = "数据分析结果.xlsx"
|
|
|
+ with pd.ExcelWriter(file_path, engine="openpyxl") as writer:
|
|
|
+ df_bid_fields.to_excel(writer, sheet_name="字段统计-中标类", index=False)
|
|
|
+ df_bid_overall.to_excel(writer, sheet_name="整体正确率-中标类", index=False)
|
|
|
+ df_tender_fields.to_excel(writer, sheet_name="字段统计-招标类", index=False)
|
|
|
+ df_tender_overall.to_excel(writer, sheet_name="整体正确率-招标类", index=False)
|
|
|
+
|
|
|
+ # Excel 格式优化
|
|
|
+ workbook = writer.book
|
|
|
+ for sheet in workbook.sheetnames:
|
|
|
+ ws = workbook[sheet]
|
|
|
+ for col in ws.columns:
|
|
|
+ max_length = 0
|
|
|
+ col_letter = col[0].column_letter
|
|
|
+ for cell in col:
|
|
|
+ try:
|
|
|
+ if cell.value:
|
|
|
+ max_length = max(max_length, len(str(cell.value)))
|
|
|
+ except:
|
|
|
+ pass
|
|
|
+ ws.column_dimensions[col_letter].width = max_length + 2 # 调整列宽
|
|
|
+
|
|
|
+ # 加粗第一行
|
|
|
+ for cell in ws[1]:
|
|
|
+ cell.font = Font(bold=True)
|
|
|
+ cell.alignment = Alignment(horizontal="center", vertical="center")
|
|
|
+
|
|
|
+ print(f"Excel 文件已保存:{file_path}")
|
|
|
+
|
|
|
+def save_to_database(df_fields, df_overall):
|
|
|
+ """保存到优化后的数据库结构"""
|
|
|
+ conn = pymysql.connect(**MYSQL_CONFIG)
|
|
|
+ cursor = conn.cursor()
|
|
|
+
|
|
|
+ try:
|
|
|
+ # 插入字段统计
|
|
|
+ for _, row in df_fields.iterrows():
|
|
|
+ sql = """
|
|
|
+ INSERT INTO sample_data_analysis (
|
|
|
+ field_name, sample_total, original_null, original_exist,
|
|
|
+ extract_error_total, extract_correct_total, extract_error_exist,
|
|
|
+ extract_correct_exist, recognition_rate, correct_recognition_rate,
|
|
|
+ accuracy_rate, data_type
|
|
|
+ ) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
|
|
|
+ """
|
|
|
+ cursor.execute(sql, (
|
|
|
+ row['field_name'], row['sample_total'], row['original_null'],
|
|
|
+ row['original_exist'], row['extract_error_total'],
|
|
|
+ row['extract_correct_total'], row['extract_error_exist'],
|
|
|
+ row['extract_correct_exist'], row['recognition_rate'],
|
|
|
+ row['correct_recognition_rate'], row['accuracy_rate'],
|
|
|
+ row['data_type']
|
|
|
+ ))
|
|
|
+
|
|
|
+ # 插入整体统计
|
|
|
+ for _, row in df_overall.iterrows():
|
|
|
+ sql = """
|
|
|
+ INSERT INTO data_quality_analysis
|
|
|
+ (total_data_count, correct_rows_count, row_accuracy, data_type)
|
|
|
+ VALUES (%s,%s,%s,%s)
|
|
|
+ """
|
|
|
+ cursor.execute(sql, (
|
|
|
+ row['total_data_count'], row['correct_rows_count'],
|
|
|
+ row['row_accuracy'], row['data_type']
|
|
|
+ ))
|
|
|
+
|
|
|
+ conn.commit()
|
|
|
+ print(f"成功插入 {len(df_fields)} 条字段记录和 {len(df_overall)} 条整体记录")
|
|
|
+ except Exception as e:
|
|
|
+ conn.rollback()
|
|
|
+ print(f"数据库操作失败: {str(e)}")
|
|
|
+ raise # 抛出异常以便调试
|
|
|
+ finally:
|
|
|
+ cursor.close()
|
|
|
+ conn.close()
|
|
|
+
|
|
|
+# 主函数
|
|
|
+def main():
|
|
|
+ df = fetch_data()
|
|
|
+ df_bid_fields, df_bid_overall = calculate_metrics_and_accuracy(df, "中标类")
|
|
|
+ df_tender_fields, df_tender_overall = calculate_metrics_and_accuracy(df, "招标类")
|
|
|
+ export_to_excel(df_bid_fields, df_bid_overall,df_tender_fields,df_tender_overall)
|
|
|
+ # 合并结果
|
|
|
+ all_fields = pd.concat([df_bid_fields, df_tender_fields])
|
|
|
+ all_overall = pd.concat([df_bid_overall, df_tender_overall])
|
|
|
+ # 存储数据
|
|
|
+ save_to_database(all_fields, all_overall)
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|