from pymongo import MongoClient from datetime import datetime, timedelta import pandas as pd import openpyxl # 数据入库量及数据监控时效 导出execl # MongoDB连接配置 host = '172.20.45.129' port = 27002 dbname = 'data_quality' collection_name = 'statistics' # 创建MongoDB连接 client = MongoClient(host, port) db = client[dbname] collection = db[collection_name] # 获取当前时间和一周前的时间 end_time = datetime.now() start_time = end_time - timedelta(weeks=1) # 将datetime转换为Unix时间戳(整数类型,去掉小数部分) start_timestamp = int(start_time.timestamp()) end_timestamp = int(end_time.timestamp()) # 输出调试信息:检查开始时间和结束时间 print("Start time:", start_time) print("End time:", end_time) print("Start timestamp:", start_timestamp) print("End timestamp:", end_timestamp) # ----------------- 第一个Sheet: 断流监控_mongo库 ------------------- # 查询过去一周的数据(断流监控_mongo库) pipeline_mongo = [ { "$match": { "$or": [ {"bidding.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}, {"bidding_ai.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}, {"connections.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}, {"nzj.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}, {"medical.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}, {"bidding_fragment.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}} ] } }, { "$limit": 5 # 限制查询返回的结果为前5条数据,便于调试 } ] # 获取符合条件的数据 data_mongo = list(collection.aggregate(pipeline_mongo)) # 初始化MongoDB字段统计数据 bidding_count = 0 bidding_ai_count = 0 connections_count = 0 nzj_count = 0 medical_count = 0 bidding_fragment_data = { "情报_法务": 0, "情报_财务审计": 0, "情报_招标代理": 0, "情报_管理咨询": 0, "情报_保险": 0, "情报_工程设计咨询": 0, "情报_安防": 0, "情报_印务商机": 0, "情报_环境采购": 0, "情报_家具招投标": 0 } # 统计MongoDB数据 for doc in data_mongo: if 'bidding' in doc: bidding_count += doc['bidding'].get('count', 0) if 'bidding_ai' in doc: bidding_ai_count += doc['bidding_ai'].get('count', 0) if 'connections' in doc: connections_count += doc['connections'].get('count', 0) if 'nzj' in doc: nzj_count += doc['nzj'].get('count', 0) if 'medical' in doc : medical_count += doc['medical'].get('count', 0) if 'bidding_fragment' in doc: for key, value in doc['bidding_fragment'].get('count', {}).items(): if key in bidding_fragment_data: bidding_fragment_data[key] += value # ----------------- 第二个Sheet: 断流监控—es ------------------- # 查询过去一周的数据(断流监控—es) pipeline_es = [ { "$match": { "$or": [ {"es_bidding.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}, {"es_bidding_ai.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}, {"es_nzj.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}, {"medical_es.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}, {"es_bidding_fragment.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}} ] } }, { "$limit": 5 # 限制查询返回的结果为前5条数据,便于调试 } ] # 获取符合条件的数据 data_es = list(collection.aggregate(pipeline_es)) # 初始化ES字段统计数据 es_bidding_count = 0 es_bidding_ai_count = 0 es_nzj_count = 0 es_medical_count = 0 es_bidding_fragment_data = { "情报_法务": 0, "情报_财务审计": 0, "情报_招标代理": 0, "情报_管理咨询": 0, "情报_保险": 0, "情报_工程设计咨询": 0, "情报_安防": 0, "情报_印务商机": 0, "情报_环境采购": 0, "情报_家具招投标": 0 } # 统计ES数据 for doc in data_es: if 'es_bidding' in doc: es_bidding_count += doc['es_bidding'].get('count', 0) if 'es_bidding_ai' in doc: es_bidding_ai_count += doc['es_bidding_ai'].get('count', 0) if 'es_nzj' in doc: es_nzj_count += doc['es_nzj'].get('count', 0) if 'es_medical_count' in doc: es_medical_count += doc['es_medical'].get('count', 0) if 'es_bidding_fragment' in doc: for key, value in doc['es_bidding_fragment'].get('count', {}).items(): if key in es_bidding_fragment_data: es_bidding_fragment_data[key] += value # ----------------- 第三个Sheet: 数据时效监控 ------------------- # 查询过去一周的数据(数据时效监控) pipeline_timeliness = [ { "$match": { "data_timeliness.timestamp": { "$gte": start_timestamp, # 使用整数Unix时间戳 "$lt": end_timestamp # 使用整数Unix时间戳 } } }, { "$limit": 5 # 限制查询返回的结果为前5条数据,便于调试 } ] # 获取符合条件的数据 data_timeliness = list(collection.aggregate(pipeline_timeliness)) # 初始化字段统计数据 timeliness_data = { "[0,5)分钟": 0, "[5,15)分钟": 0, "[15,30)分钟": 0, "[30,60)分钟": 0, "[1,3)小时": 0, "[3,7)小时": 0, "[7,15)小时": 0, "[15,24)小时": 0, "[1,2)天": 0, "[2,3)天": 0, "3天+": 0 } # 统计数据 for doc in data_timeliness: if 'data_timeliness' in doc: count_data = doc['data_timeliness'].get('count', {}) timeliness_data["[0,5)分钟"] += float(count_data.get("a1", "0%").replace('%', '')) timeliness_data["[5,15)分钟"] += float(count_data.get("a2", "0%").replace('%', '')) timeliness_data["[15,30)分钟"] += float(count_data.get("a3", "0%").replace('%', '')) timeliness_data["[30,60)分钟"] += float(count_data.get("a4", "0%").replace('%', '')) timeliness_data["[1,3)小时"] += float(count_data.get("a5", "0%").replace('%', '')) timeliness_data["[3,7)小时"] += float(count_data.get("a6", "0%").replace('%', '')) timeliness_data["[7,15)小时"] += float(count_data.get("a7", "0%").replace('%', '')) timeliness_data["[15,24)小时"] += float(count_data.get("a8", "0%").replace('%', '')) timeliness_data["[1,2)天"] += float(count_data.get("a9", "0%").replace('%', '')) timeliness_data["[2,3)天"] += float(count_data.get("a10", "0%").replace('%', '')) timeliness_data["3天+"] += float(count_data.get("a11", "0%").replace('%', '')) # 获取当前时间的一周时间范围字符串 date_range = f"{start_time.strftime('%Y/%m/%d')}-{end_time.strftime('%Y/%m/%d')}" # 构建Excel数据 columns = ['日期', '标讯每周入库数据量', '高质量库每周入库数据量', '人脉管理数据', '拟在建数据量(全国)','医械通'] + list(bidding_fragment_data.keys()) data_row_mongo = [date_range, bidding_count, bidding_ai_count, connections_count, nzj_count,medical_count] + list(bidding_fragment_data.values()) columns_es = ['日期', '标讯每周入库数据量', '高质量库每周数据入库量', '拟在建数据量(全国)','医械通'] + list(es_bidding_fragment_data.keys()) data_row_es = [date_range, es_bidding_count, es_bidding_ai_count, es_nzj_count,es_medical_count] + list(es_bidding_fragment_data.values()) columns_timeliness = ['日期'] + list(timeliness_data.keys()) data_row_timeliness = [date_range] + list(timeliness_data.values()) # 创建DataFrame并写入Excel excel_file = '../周报表格导出/mongo_data_statistics_combined1.xlsx' with pd.ExcelWriter(excel_file, engine='openpyxl') as writer: # 写入第一个sheet(断流监控_mongo库) df_mongo = pd.DataFrame([data_row_mongo], columns=columns) df_mongo.to_excel(writer, sheet_name='入库数据量监控-mongo(每周)', index=False) # 写入第二个sheet(断流监控—es) df_es = pd.DataFrame([data_row_es], columns=columns_es) df_es.to_excel(writer, sheet_name='入库量数据量监控-es(每周)', index=False) # 将timeliness_data中的值转换为百分比字符串 for key in timeliness_data: timeliness_data[key] = f"{timeliness_data[key]:.2f}%" # 构建数据行 data_row_timeliness = [date_range] + list(timeliness_data.values()) # 写入第三个sheet(数据时效监控) df_timeliness = pd.DataFrame([data_row_timeliness], columns=columns_timeliness) df_timeliness.to_excel(writer, sheet_name='数据时效监控(7天平均值)', index=False) print(f"统计结果已写入Excel文件: {excel_file}")