|
@@ -0,0 +1,220 @@
|
|
|
|
+from pymongo import MongoClient
|
|
|
|
+from datetime import datetime, timedelta
|
|
|
|
+import pandas as pd
|
|
|
|
+import openpyxl
|
|
|
|
+
|
|
|
|
+# MongoDB连接配置
|
|
|
|
+host = '192.168.3.149'
|
|
|
|
+port = 27180
|
|
|
|
+dbname = 'data_quality'
|
|
|
|
+collection_name = 'statistics'
|
|
|
|
+
|
|
|
|
+# 创建MongoDB连接
|
|
|
|
+client = MongoClient(host, port)
|
|
|
|
+db = client[dbname]
|
|
|
|
+collection = db[collection_name]
|
|
|
|
+
|
|
|
|
+# 获取当前时间和一周前的时间
|
|
|
|
+end_time = datetime.now()
|
|
|
|
+start_time = end_time - timedelta(weeks=1)
|
|
|
|
+
|
|
|
|
+# 将datetime转换为Unix时间戳(整数类型,去掉小数部分)
|
|
|
|
+start_timestamp = int(start_time.timestamp())
|
|
|
|
+end_timestamp = int(end_time.timestamp())
|
|
|
|
+
|
|
|
|
+# 输出调试信息:检查开始时间和结束时间
|
|
|
|
+print("Start time:", start_time)
|
|
|
|
+print("End time:", end_time)
|
|
|
|
+print("Start timestamp:", start_timestamp)
|
|
|
|
+print("End timestamp:", end_timestamp)
|
|
|
|
+
|
|
|
|
+# ----------------- 第一个Sheet: 断流监控_mongo库 -------------------
|
|
|
|
+
|
|
|
|
+# 查询过去一周的数据(断流监控_mongo库)
|
|
|
|
+pipeline_mongo = [
|
|
|
|
+ {
|
|
|
|
+ "$match": {
|
|
|
|
+ "$or": [
|
|
|
|
+ {"bidding.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
|
|
|
|
+ {"bidding_ai.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
|
|
|
|
+ {"connections.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
|
|
|
|
+ {"nzj.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
|
|
|
|
+ {"bidding_fragment.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}
|
|
|
|
+ ]
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ "$limit": 5 # 限制查询返回的结果为前5条数据,便于调试
|
|
|
|
+ }
|
|
|
|
+]
|
|
|
|
+
|
|
|
|
+# 获取符合条件的数据
|
|
|
|
+data_mongo = list(collection.aggregate(pipeline_mongo))
|
|
|
|
+
|
|
|
|
+# 初始化MongoDB字段统计数据
|
|
|
|
+bidding_count = 0
|
|
|
|
+bidding_ai_count = 0
|
|
|
|
+connections_count = 0
|
|
|
|
+nzj_count = 0
|
|
|
|
+bidding_fragment_data = {
|
|
|
|
+ "情报_法务": 0,
|
|
|
|
+ "情报_财务审计": 0,
|
|
|
|
+ "情报_招标代理": 0,
|
|
|
|
+ "情报_管理咨询": 0,
|
|
|
|
+ "情报_保险": 0,
|
|
|
|
+ "情报_工程设计咨询": 0,
|
|
|
|
+ "情报_安防": 0,
|
|
|
|
+ "情报_印务商机": 0,
|
|
|
|
+ "情报_环境采购": 0,
|
|
|
|
+ "情报_家具招投标": 0
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+# 统计MongoDB数据
|
|
|
|
+for doc in data_mongo:
|
|
|
|
+ if 'bidding' in doc:
|
|
|
|
+ bidding_count += doc['bidding'].get('count', 0)
|
|
|
|
+ if 'bidding_ai' in doc:
|
|
|
|
+ bidding_ai_count += doc['bidding_ai'].get('count', 0)
|
|
|
|
+ if 'connections' in doc:
|
|
|
|
+ connections_count += doc['connections'].get('count', 0)
|
|
|
|
+ if 'nzj' in doc:
|
|
|
|
+ nzj_count += doc['nzj'].get('count', 0)
|
|
|
|
+ if 'bidding_fragment' in doc:
|
|
|
|
+ for key, value in doc['bidding_fragment'].get('count', {}).items():
|
|
|
|
+ if key in bidding_fragment_data:
|
|
|
|
+ bidding_fragment_data[key] += value
|
|
|
|
+
|
|
|
|
+# ----------------- 第二个Sheet: 断流监控—es -------------------
|
|
|
|
+
|
|
|
|
+# 查询过去一周的数据(断流监控—es)
|
|
|
|
+pipeline_es = [
|
|
|
|
+ {
|
|
|
|
+ "$match": {
|
|
|
|
+ "$or": [
|
|
|
|
+ {"es_bidding.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
|
|
|
|
+ {"es_bidding_ai.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
|
|
|
|
+ {"es_nzj.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
|
|
|
|
+ {"es_bidding_fragment.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}
|
|
|
|
+ ]
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ "$limit": 5 # 限制查询返回的结果为前5条数据,便于调试
|
|
|
|
+ }
|
|
|
|
+]
|
|
|
|
+
|
|
|
|
+# 获取符合条件的数据
|
|
|
|
+data_es = list(collection.aggregate(pipeline_es))
|
|
|
|
+
|
|
|
|
+# 初始化ES字段统计数据
|
|
|
|
+es_bidding_count = 0
|
|
|
|
+es_bidding_ai_count = 0
|
|
|
|
+es_nzj_count = 0
|
|
|
|
+es_bidding_fragment_data = {
|
|
|
|
+ "情报_法务": 0,
|
|
|
|
+ "情报_财务审计": 0,
|
|
|
|
+ "情报_招标代理": 0,
|
|
|
|
+ "情报_管理咨询": 0,
|
|
|
|
+ "情报_保险": 0,
|
|
|
|
+ "情报_工程设计咨询": 0,
|
|
|
|
+ "情报_安防": 0,
|
|
|
|
+ "情报_印务商机": 0,
|
|
|
|
+ "情报_环境采购": 0,
|
|
|
|
+ "情报_家具招投标": 0
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+# 统计ES数据
|
|
|
|
+for doc in data_es:
|
|
|
|
+ if 'es_bidding' in doc:
|
|
|
|
+ es_bidding_count += doc['es_bidding'].get('count', 0)
|
|
|
|
+ if 'es_bidding_ai' in doc:
|
|
|
|
+ es_bidding_ai_count += doc['es_bidding_ai'].get('count', 0)
|
|
|
|
+ if 'es_nzj' in doc:
|
|
|
|
+ es_nzj_count += doc['es_nzj'].get('count', 0)
|
|
|
|
+ if 'es_bidding_fragment' in doc:
|
|
|
|
+ for key, value in doc['es_bidding_fragment'].get('count', {}).items():
|
|
|
|
+ if key in es_bidding_fragment_data:
|
|
|
|
+ es_bidding_fragment_data[key] += value
|
|
|
|
+
|
|
|
|
+# ----------------- 第三个Sheet: 数据时效监控 -------------------
|
|
|
|
+
|
|
|
|
+# 查询过去一周的数据(数据时效监控)
|
|
|
|
+pipeline_timeliness = [
|
|
|
|
+ {
|
|
|
|
+ "$match": {
|
|
|
|
+ "data_timeliness.timestamp": {
|
|
|
|
+ "$gte": start_timestamp, # 使用整数Unix时间戳
|
|
|
|
+ "$lt": end_timestamp # 使用整数Unix时间戳
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ },
|
|
|
|
+ {
|
|
|
|
+ "$limit": 5 # 限制查询返回的结果为前5条数据,便于调试
|
|
|
|
+ }
|
|
|
|
+]
|
|
|
|
+
|
|
|
|
+# 获取符合条件的数据
|
|
|
|
+data_timeliness = list(collection.aggregate(pipeline_timeliness))
|
|
|
|
+
|
|
|
|
+# 初始化字段统计数据
|
|
|
|
+timeliness_data = {
|
|
|
|
+ "[0,5)分钟": 0,
|
|
|
|
+ "[5,15)分钟": 0,
|
|
|
|
+ "[15,30)分钟": 0,
|
|
|
|
+ "[30,60)分钟": 0,
|
|
|
|
+ "[1,3)小时": 0,
|
|
|
|
+ "[3,7)小时": 0,
|
|
|
|
+ "[7,15)小时": 0,
|
|
|
|
+ "[15,24)小时": 0,
|
|
|
|
+ "[1,2)天": 0,
|
|
|
|
+ "[2,3)天": 0,
|
|
|
|
+ "3天+": 0
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+# 统计数据
|
|
|
|
+for doc in data_timeliness:
|
|
|
|
+ if 'data_timeliness' in doc:
|
|
|
|
+ count_data = doc['data_timeliness'].get('count', {})
|
|
|
|
+ timeliness_data["[0,5)分钟"] += float(count_data.get("a1", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["[5,15)分钟"] += float(count_data.get("a2", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["[15,30)分钟"] += float(count_data.get("a3", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["[30,60)分钟"] += float(count_data.get("a4", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["[1,3)小时"] += float(count_data.get("a5", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["[3,7)小时"] += float(count_data.get("a6", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["[7,15)小时"] += float(count_data.get("a7", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["[15,24)小时"] += float(count_data.get("a8", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["[1,2)天"] += float(count_data.get("a9", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["[2,3)天"] += float(count_data.get("a10", "0%").replace('%', ''))
|
|
|
|
+ timeliness_data["3天+"] += float(count_data.get("a11", "0%").replace('%', ''))
|
|
|
|
+
|
|
|
|
+# 获取当前时间的一周时间范围字符串
|
|
|
|
+date_range = f"{start_time.strftime('%Y/%m/%d')}-{end_time.strftime('%Y/%m/%d')}"
|
|
|
|
+
|
|
|
|
+# 构建Excel数据
|
|
|
|
+columns = ['日期', '标准库每周入库数据量', '高质量库每周入库数据量', '人脉管理数据', '拟在建数据量(全国)'] + list(bidding_fragment_data.keys())
|
|
|
|
+data_row_mongo = [date_range, bidding_count, bidding_ai_count, connections_count, nzj_count] + list(bidding_fragment_data.values())
|
|
|
|
+
|
|
|
|
+columns_es = ['日期', '标准库每周入库数据量', '高质量库每周数据入库量', '拟在建数据量(全国)'] + list(es_bidding_fragment_data.keys())
|
|
|
|
+data_row_es = [date_range, es_bidding_count, es_bidding_ai_count, es_nzj_count] + list(es_bidding_fragment_data.values())
|
|
|
|
+
|
|
|
|
+columns_timeliness = ['日期'] + list(timeliness_data.keys())
|
|
|
|
+data_row_timeliness = [date_range] + list(timeliness_data.values())
|
|
|
|
+
|
|
|
|
+# 创建DataFrame并写入Excel
|
|
|
|
+excel_file = 'mongo_data_statistics_combined.xlsx'
|
|
|
|
+
|
|
|
|
+with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
|
|
|
|
+ # 写入第一个sheet(断流监控_mongo库)
|
|
|
|
+ df_mongo = pd.DataFrame([data_row_mongo], columns=columns)
|
|
|
|
+ df_mongo.to_excel(writer, sheet_name='断流监控_mongo库(每周)', index=False)
|
|
|
|
+
|
|
|
|
+ # 写入第二个sheet(断流监控—es)
|
|
|
|
+ df_es = pd.DataFrame([data_row_es], columns=columns_es)
|
|
|
|
+ df_es.to_excel(writer, sheet_name='断流监控-es(每周)', index=False)
|
|
|
|
+
|
|
|
|
+ # 写入第三个sheet(数据时效监控)
|
|
|
|
+ df_timeliness = pd.DataFrame([data_row_timeliness], columns=columns_timeliness)
|
|
|
|
+ df_timeliness.to_excel(writer, sheet_name='数据时效监控(每周平均值)', index=False)
|
|
|
|
+
|
|
|
|
+print(f"统计结果已写入Excel文件: {excel_file}")
|
|
|
|
+
|