ソースを参照

断流监控,分数字段分析

lizhikun 9 ヶ月 前
コミット
05cf10ea2a

+ 72 - 0
tools/mongo断流监控/mongo_monitor.py

@@ -0,0 +1,72 @@
+# -*- coding: utf-8 -*-
+
+import pymongo
+from datetime import datetime, timedelta
+import pandas as pd
+import os
+
+# MongoDB连接配置
+client = pymongo.MongoClient("mongodb://192.168.3.149:27180/")
+db = client["data_quality"]
+collection = db["bidding_qb1031"]
+
+# 定义一周的时间范围,转换为Unix时间戳格式
+end_date = int(datetime.now().timestamp())
+start_date = int((datetime.now() - timedelta(days=7)).timestamp())
+
+# 定义要监控的字段值
+tags = [
+    "情报_法务",
+    "情报_财务审计",
+    "情报_招标代理",
+    "情报_管理咨询",
+    "情报_保险",
+    "情报_工程设计咨询"
+]
+
+# 查询条件,注意 comeintime 使用时间戳范围
+query = {
+    "comeintime": {"$gte": start_date, "$lt": end_date},
+    "tag_topinformation": {"$in": tags}
+}
+
+# 统计每个标签的数量
+results = collection.aggregate([
+    {"$match": query},
+    {"$unwind": "$tag_topinformation"},  # 展开数组元素
+    {"$match": {"tag_topinformation": {"$in": tags}}},  # 再次匹配展开后的标签值
+    {"$group": {"_id": "$tag_topinformation", "count": {"$sum": 1}}}
+])
+
+# 初始化字典,将所有标签的计数设置为0
+data = {tag: 0 for tag in tags}
+
+# 更新字典中有数据的标签的数量
+for result in results:
+    data[result["_id"]] = result["count"]
+
+# 创建DataFrame,以当前时间范围为索引
+date_range = f"{datetime.fromtimestamp(start_date).strftime('%Y/%m/%d')}-{datetime.fromtimestamp(end_date).strftime('%Y/%m/%d')}"
+df = pd.DataFrame([data], index=[date_range])
+
+# Excel文件路径
+output_file = "weekly_data_statistics.xlsx"
+
+# 检查文件是否存在,不存在则创建
+if not os.path.exists(output_file):
+    # 创建一个包含所有标签列的DataFrame并写入Excel文件
+    df.to_excel(output_file, sheet_name="Weekly Statistics", index_label="日期")
+else:
+    # 如果文件已存在,以追加模式写入数据,不覆盖已有数据
+    with pd.ExcelWriter(output_file, mode="a", engine="openpyxl", if_sheet_exists="overlay") as writer:
+        # 载入现有数据,检查是否需要覆盖日期索引
+        existing_df = pd.read_excel(output_file, sheet_name="Weekly Statistics", index_col=0)
+        if date_range in existing_df.index:
+            # 更新相同日期范围的数据
+            existing_df.update(df)
+            existing_df.to_excel(writer, sheet_name="Weekly Statistics", index_label="日期")
+        else:
+            # 追加新数据行
+            df.to_excel(writer, sheet_name="Weekly Statistics", index_label="日期")
+
+print(f"统计结果已保存到 {output_file}")

+ 311 - 0
tools/分数字段结果分析/result_export.py

@@ -0,0 +1,311 @@
+# 导入所需库
+from datetime import datetime
+
+import pandas as pd
+from pymongo import MongoClient
+from openpyxl import load_workbook
+from sympy.physics.continuum_mechanics.beam import numpy
+
+
+# 将这个函数定义放在你的脚本顶部或其他函数定义附近
+def convert_numpy_int(obj):
+    if isinstance(obj, numpy.int64):
+        return int(obj)
+    elif isinstance(obj, dict):
+        return {key: convert_numpy_int(value) for key, value in obj.items()}
+    elif isinstance(obj, list):
+        return [convert_numpy_int(elem) for elem in obj]
+    else:
+        return obj
+
+# MongoDB连接配置
+host = '192.168.3.149'  # MongoDB主机地址
+port = 27180  # MongoDB端口
+dbname = 'data_quality'  # 数据库名称
+collection_name = 'bidding_20241033'  # 集合名称
+
+# 创建MongoDB连接
+client = MongoClient(host, port)
+db = client[dbname]
+collection = db[collection_name]
+
+# 从MongoDB读取数据
+data = pd.DataFrame(list(collection.find()))
+
+# 定义字段中英文映射
+column_name_mapping = {
+    "area_qa": "省份",
+    "bidamount_qa": "中标金额",
+    "budget_qa": "预算",
+    "buyer_qa": "采购单位",
+    "multipackage_qa": "分包",
+    "projectcode_qa": "项目编号",
+    "projectname_qa": "项目名称",
+    "title_qa": "标题",
+    "winner_qa": "中标单位",
+    "score": "标讯总分数",
+    "bidopentime_qa": "开标时间",
+    "publishtime_qa": "发布时间",
+    "toptype_qa": "信息一级分类",
+    "subtype_qa": "信息二级分类"
+}
+
+# 将字段名称更改为中文
+data.rename(columns=column_name_mapping, inplace=True)
+# 检查列是否存在并输出列名
+print("当前的列名:")
+print(data.columns)
+
+# 定义你期望检查的列
+expected_columns = ["信息一级分类", "信息二级分类"]
+
+# 循环检查每一列
+for col in expected_columns:
+    if col in data.columns:
+        print(f"列 '{col}' 存在于数据框架中。")
+        # 列存在时,打印这一列的值分布
+        print(f"{col} 字段值分布:")
+        print(data[col].value_counts(dropna=False))
+    else:
+        print(f"警告:列 '{col}' 不在数据框架中。")
+
+# 打印语句来检查 '一级分类' 和 '二级分类' 字段的值
+# print("一级分类字段值分布:")
+# print(data['信息一级分类'].value_counts(dropna=False))
+# print("\n二级分类字段值分布:")
+# print(data['信息二级分类'].value_counts(dropna=False))
+# 关闭MongoDB连接
+client.close()
+#  analyze_column 函数,处理 NaN 值
+def analyze_column(dataframe, column_name):
+    if column_name not in dataframe.columns:
+        # 字段不存在时,认为所有记录都是正确的
+        total = len(dataframe)
+        correct = total
+        error = 0
+    else:
+        # 对于存在的字段,NaN 和空字典 {} 视为正确,其他视为错误
+        total = len(dataframe[column_name])
+        correct = dataframe[column_name].apply(lambda x: pd.isna(x) or x == {}).sum()
+        error = total - correct
+
+    accuracy = correct / total if total > 0 else 0
+    error_rate = error / total if total > 0 else 0
+
+    # 收集错误原因
+    error_reasons = dataframe[column_name].apply(
+        lambda x: x if x != {} and not pd.isna(x) else None).dropna().value_counts()
+
+    return total, correct, error, accuracy, error_rate, error_reasons
+
+# 重新格式化错误原因的数据结构
+def reformat_error_reasons_safe(error_reasons_series):
+    # 初始化一个空字典,用于存储重新格式化的错误原因
+    reformatted_reasons = {}
+
+    # 遍历错误原因字典及其对应的次数
+    for error_dict, count in error_reasons_series.items():
+        if isinstance(error_dict, dict):  # 如果是字典类型的错误原因
+            for error_code, reason in error_dict.items():
+                # 检查原因字符串是否包含逗号
+                if ',' in reason:
+                    parts = reason.split(',')
+                    formatted_reason = parts[1].strip()
+                else:
+                    formatted_reason = reason.strip()
+
+                # 如果格式化后的原因非空,则构建键值对并更新字典
+                if formatted_reason:
+                    key = (formatted_reason,)
+                    if key not in reformatted_reasons:
+                        reformatted_reasons[key] = count
+                    else:
+                        reformatted_reasons[key] += count
+        elif isinstance(error_dict, list):  # 如果是列表类型的错误原因
+            key = (tuple(error_dict),) if error_dict else None
+            if key not in reformatted_reasons:
+                reformatted_reasons[key] = count
+            else:
+                reformatted_reasons[key] += count
+        else:  # 其他类型的错误原因
+            key = (error_dict,) if error_dict else None
+            if key not in reformatted_reasons:
+                reformatted_reasons[key] = count
+            else:
+                reformatted_reasons[key] += count
+
+    # 构建最终格式化后的结果字典,去除空键和空字符串键
+    formatted_results = {
+        str(key[0]): value for key, value in reformatted_reasons.items() if key and key[0] != ''
+    }
+    return formatted_results
+
+
+# 对每个字段进行分析
+fields_to_analyze = ["省份", "中标金额", "预算", "采购单位", "分包", "项目编号", "项目名称", "标题", "中标单位",
+                     "开标时间", "发布时间", "信息一级分类", "信息二级分类"]
+expanded_analysis_results = []
+
+for col in fields_to_analyze:
+    if col in data.columns:  # 在尝试分析之前检查字段是否存在
+        total, correct, error, accuracy, error_rate, error_reasons = analyze_column(data, col)
+        reformatted_error_reasons = reformat_error_reasons_safe(error_reasons)
+
+        for reason, count in reformatted_error_reasons.items():
+            reason = str(reason).replace('(', '').replace(',)', '').replace("'", '')
+            if error > 0:
+                single_reason_error_rate = count / error
+            else:
+                single_reason_error_rate = 0  # 防止除以零的情况
+
+            expanded_analysis_results.append({
+                '字段': col,
+                '总量': total,
+                '正确数量': correct,
+                '错误数量': error,
+                '正确率': f'{accuracy:.2%}',
+                '错误率': f'{error_rate:.2%}',
+                '错误原因': reason,
+                '错误次数': count,
+                '单个原因错误率': f'{single_reason_error_rate:.2%}'
+            })
+    else:
+        print(f"警告:列 '{col}' 不在数据框架中,将跳过此字段。")
+
+# 创建DataFrame并可能进行后续操作
+expanded_analysis_results_df = pd.DataFrame(expanded_analysis_results)
+# "标讯总分数" 字段的分布
+if "标讯总分数" in data.columns:
+    # 转换为浮点数
+    data['标讯总分数'] = data['标讯总分数'].astype(float)
+    score_counts = data['标讯总分数'].value_counts().sort_index()
+    total_scores = len(data['标讯总分数'])
+    score_percentages = (score_counts / total_scores) * 100
+    score_distribution_df = pd.DataFrame({
+        '标讯总分数': score_counts.index,
+        '数量': score_counts.values,
+        '百分比': score_percentages.values
+    })
+    # 确保得分正确转换为浮点数
+    data['标讯总分数'] = data['标讯总分数'].apply(float)
+    # 计算得分为100的数量,确保类型匹配
+    score_100_count = score_counts.get(100) if 100 in score_counts else 0
+    # 创建MongoDB连接
+    client = MongoClient('192.168.3.149', 27180)  # 使用指定的地址和端口
+    db = client['data_quality']  # 选择 'data_quality' 数据库
+    score_collection = db['score']  # 选择 'score' 集合
+
+    # 构建要存储到MongoDB的数据
+    data_to_store = {
+        'score': 100,
+        'score_number': score_100_count,
+        'timestamp': datetime.now()  # 添加当前时间戳
+    }
+    # 使用 convert_numpy_int 函数确保所有数据都是 MongoDB 兼容的格式
+    data_to_store_converted = convert_numpy_int(data_to_store)
+    # 存储转换后的数据到新指定的MongoDB集合中
+    score_collection.insert_one(data_to_store_converted)
+
+    # 百分比格式化为字符串,并附加百分号
+    score_distribution_df['百分比'] = score_distribution_df['百分比'].apply(lambda x: f'{x:.2f}%')
+
+
+# "purchasinglist" 下的 "score" 字段的分布
+if 'purchasinglist' in data.columns:
+    # 提取 "score" 并转换为浮点数
+    purchasinglist_scores = data['purchasinglist'].map(
+        lambda x: float(x[0]['score']) if isinstance(x, list) and x and isinstance(x[0], dict) and 'score' in x[
+            0] else 0
+    )
+
+    purchasinglist_score_counts = purchasinglist_scores.value_counts().sort_index()
+    purchasinglist_total_scores = purchasinglist_scores.notnull().sum()
+    purchasinglist_score_percentages = (purchasinglist_score_counts / purchasinglist_total_scores) * 100
+    purchasinglist_score_distribution_df = pd.DataFrame({
+        '标的物分数': purchasinglist_score_counts.index,
+        '数量': purchasinglist_score_counts.values,
+        '百分比': purchasinglist_score_percentages.values
+    })
+    # 百分比格式化为字符串,并附加百分号
+    purchasinglist_score_distribution_df['百分比'] = purchasinglist_score_distribution_df['百分比'].apply(
+        lambda x: f'{x:.2f}%')
+
+# 对错误次数进行倒序排序
+expanded_analysis_results_df = expanded_analysis_results_df.sort_values(by='错误次数', ascending=False)
+
+# MongoDB导出配置
+export_host = '192.168.3.149'  # MongoDB主机地址
+export_port = 27180  # MongoDB端口
+export_dbname = 'data_quality'  # 数据库名称
+export_collection_name = 'export'  # 导出的集合名称
+
+# 创建用于导出数据的MongoDB连接
+export_client = MongoClient(export_host, export_port)
+export_db = export_client[export_dbname]
+export_collection = export_db[export_collection_name]
+# 将分析结果导入MongoDB
+for result in expanded_analysis_results:
+    # 构建导出数据的格式
+    export_entry = {
+        'error_cause': result['错误原因'],
+        'error_count': result['错误次数'],
+        'timestamp': datetime.now()  # 添加当前时间戳
+    }
+    print(export_entry)  # 查看时间戳是否正确生成
+
+    # 在插入之前应用转换
+    export_entry = convert_numpy_int(export_entry)
+
+    # 插入数据到MongoDB集合
+    export_collection.insert_one(export_entry)
+# 关闭导出数据用的MongoDB连接
+export_client.close()
+
+
+# 使用 pd.ExcelWriter 进行写入操作
+with pd.ExcelWriter('临时文件.xlsx', engine='openpyxl') as writer:
+    # 新建一个工作表 "分数分析结果"
+    writer.sheets['分数分析结果'] = writer.book.create_sheet('分数分析结果')
+    if "标讯总分数" in data.columns:
+        # 添加总量列
+        score_distribution_df['总量'] = total_scores
+        # 对分数进行倒序排序
+        score_distribution_df = score_distribution_df.sort_values(by='标讯总分数', ascending=False)
+        score_distribution_df.to_excel(writer, sheet_name='分数分析结果', index=False)
+
+    # 新建一列写入 "purchasinglist" 下的 "score" 分布
+    if 'purchasinglist' in data.columns and purchasinglist_scores.notnull().any():
+        # 注意这里的startcol参数,它应该基于您的实际数据列数来设置
+        purchasinglist_score_distribution_df = purchasinglist_score_distribution_df.sort_values(by='标的物分数',
+                                                                                                ascending=False)
+        purchasinglist_score_distribution_df.to_excel(writer, sheet_name='分数分析结果',
+                                                      startcol=len(score_distribution_df.columns) + 2, index=False)
+
+        # 添加总量列
+        purchasinglist_score_distribution_df['总量'] = purchasinglist_total_scores
+        purchasinglist_score_distribution_df.to_excel(writer, sheet_name='分数分析结果',
+                                                      startcol=len(score_distribution_df.columns) + 2, index=False)
+
+    expanded_analysis_results_df.to_excel(writer, sheet_name='字段分析结果', index=False)
+
+# 假设您的分析结果已经保存在一个临时文件中
+temp_analysis_file = '临时文件.xlsx'  # 临时文件的路径
+
+# 加载您想要合并结果到的Excel文件
+modified_file_path = 'pin.xlsx'  #拼接文件路径
+wb = load_workbook(modified_file_path)
+
+# 加载包含分析结果的临时Excel文件
+temp_wb = load_workbook(temp_analysis_file)
+
+# 将临时文件中的工作表复制到修改过的文件中
+for sheet_name in temp_wb.sheetnames:
+    source = temp_wb[sheet_name]
+    target = wb.create_sheet(sheet_name)
+
+    for row in source.iter_rows(min_row=1, max_col=source.max_column, max_row=source.max_row, values_only=True):
+        target.append(row)
+
+# 保存最终的合并文件
+final_merged_file_path = '质量分析报告.xlsx'  # 最终合并文件的路径
+wb.save(final_merged_file_path)