|
@@ -0,0 +1,138 @@
|
|
|
|
+from pymongo import MongoClient
|
|
|
|
+import pymysql
|
|
|
|
+from datetime import datetime, timedelta
|
|
|
|
+import pandas as pd
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def data_timeliness_analysis():
|
|
|
|
+ try:
|
|
|
|
+ # ==================== MongoDB连接配置 ====================
|
|
|
|
+ mongo_client = MongoClient(
|
|
|
|
+ 'mongodb://127.0.0.1:27087/',
|
|
|
|
+ unicode_decode_error_handler="ignore",
|
|
|
|
+ directConnection=True
|
|
|
|
+ )
|
|
|
|
+ mongo_db = mongo_client["jyqyfw"]
|
|
|
|
+ collection = mongo_db["usermail"]
|
|
|
|
+
|
|
|
|
+ # ==================== MySQL连接配置 ====================
|
|
|
|
+ mysql_conn = pymysql.connect(
|
|
|
|
+ host='172.20.45.129',
|
|
|
|
+ port=4000,
|
|
|
|
+ user='root', # 替换实际用户名
|
|
|
|
+ password='=PDT49#80Z!RVv52_z', # 替换实际密码
|
|
|
|
+ database='quality',
|
|
|
|
+ charset='utf8mb4'
|
|
|
|
+ )
|
|
|
|
+
|
|
|
|
+ # ==================== 数据统计逻辑 ====================
|
|
|
|
+ # 时间范围设置(当前时间前1小时)
|
|
|
|
+ now = datetime.now()
|
|
|
|
+ hour_start = (now - timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
|
|
|
|
+
|
|
|
|
+ # 构建查询条件
|
|
|
|
+ query = {
|
|
|
|
+ "appid": "jyGQ1XQQsEAwNeSENOFR9D",
|
|
|
|
+ "createtime": {
|
|
|
|
+ "$gte": int(hour_start.timestamp()),
|
|
|
|
+ "$lt": int(now.replace(minute=0, second=0, microsecond=0).timestamp())
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ # 初始化统计结果
|
|
|
|
+ stats = {
|
|
|
|
+ 'create_time': now.strftime('%Y-%m-%d %H:%M:%S'),
|
|
|
|
+ 'total_count': 0,
|
|
|
|
+ 'less_than_2h_count': 0,
|
|
|
|
+ 'less_than_2h_ratio': 0.0,
|
|
|
|
+ '2h_to_3h_count': 0,
|
|
|
|
+ '2h_to_3h_ratio': 0.0,
|
|
|
|
+ '3h_to_4h_count': 0,
|
|
|
|
+ '3h_to_4h_ratio': 0.0,
|
|
|
|
+ '4h_to_8h_count': 0,
|
|
|
|
+ '4h_to_8h_ratio': 0.0,
|
|
|
|
+ 'more_than_8h_count': 0,
|
|
|
|
+ 'more_than_8h_ratio': 0.0
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ # 执行统计查询
|
|
|
|
+ total_count = collection.count_documents(query)
|
|
|
|
+ stats['total_count'] = total_count
|
|
|
|
+
|
|
|
|
+ if total_count > 0:
|
|
|
|
+ # 各时间区间统计
|
|
|
|
+ for doc in collection.find(query, {"createtime": 1, "publishtime": 1, "_id": 0}):
|
|
|
|
+ hours_diff = abs(doc["createtime"] - doc["publishtime"]) / 3600
|
|
|
|
+
|
|
|
|
+ if hours_diff <= 2:
|
|
|
|
+ stats['less_than_2h_count'] += 1
|
|
|
|
+ elif 2 < hours_diff <= 3:
|
|
|
|
+ stats['2h_to_3h_count'] += 1
|
|
|
|
+ elif 3 < hours_diff <= 4:
|
|
|
|
+ stats['3h_to_4h_count'] += 1
|
|
|
|
+ elif 4 < hours_diff <= 8:
|
|
|
|
+ stats['4h_to_8h_count'] += 1
|
|
|
|
+ else:
|
|
|
|
+ stats['more_than_8h_count'] += 1
|
|
|
|
+
|
|
|
|
+ # 计算占比(保留2位小数)
|
|
|
|
+ stats['less_than_2h_ratio'] = round(stats['less_than_2h_count'] / total_count * 100, 2)
|
|
|
|
+ stats['2h_to_3h_ratio'] = round(stats['2h_to_3h_count'] / total_count * 100, 2)
|
|
|
|
+ stats['3h_to_4h_ratio'] = round(stats['3h_to_4h_count'] / total_count * 100, 2)
|
|
|
|
+ stats['4h_to_8h_ratio'] = round(stats['4h_to_8h_count'] / total_count * 100, 2)
|
|
|
|
+ stats['more_than_8h_ratio'] = round(stats['more_than_8h_count'] / total_count * 100, 2)
|
|
|
|
+
|
|
|
|
+ # ==================== 生成Excel报表 ====================
|
|
|
|
+ excel_data = {
|
|
|
|
+ "创建时间": [stats['create_time']],
|
|
|
|
+ "总量": [stats['total_count']],
|
|
|
|
+ "<=2小时数量": [stats['less_than_2h_count']],
|
|
|
|
+ "<=2小时占比": [f"{stats['less_than_2h_ratio']}%"],
|
|
|
|
+ "2-3小时数量": [stats['2h_to_3h_count']],
|
|
|
|
+ "2-3小时占比": [f"{stats['2h_to_3h_ratio']}%"],
|
|
|
|
+ "3-4小时数量": [stats['3h_to_4h_count']],
|
|
|
|
+ "3-4小时占比": [f"{stats['3h_to_4h_ratio']}%"],
|
|
|
|
+ "4-8小时数量": [stats['4h_to_8h_count']],
|
|
|
|
+ "4-8小时占比": [f"{stats['4h_to_8h_ratio']}%"],
|
|
|
|
+ ">8小时数量": [stats['more_than_8h_count']],
|
|
|
|
+ ">8小时占比": [f"{stats['more_than_8h_ratio']}%"]
|
|
|
|
+ }
|
|
|
|
+ pd.DataFrame(excel_data).to_excel("数据时效统计表.xlsx", index=False)
|
|
|
|
+
|
|
|
|
+ # ==================== 导入MySQL数据库 ====================
|
|
|
|
+ with mysql_conn.cursor() as cursor:
|
|
|
|
+ sql = """
|
|
|
|
+ INSERT INTO data_timeliness_liantong (
|
|
|
|
+ create_time, total_count,
|
|
|
|
+ less_than_2h_count, less_than_2h_ratio,
|
|
|
|
+ `2h_to_3h_count`, `2h_to_3h_ratio`,
|
|
|
|
+ `3h_to_4h_count`, `3h_to_4h_ratio`,
|
|
|
|
+ `4h_to_8h_count`, `4h_to_8h_ratio`,
|
|
|
|
+ more_than_8h_count, more_than_8h_ratio
|
|
|
|
+ ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
|
+ """
|
|
|
|
+ cursor.execute(sql, (
|
|
|
|
+ stats['create_time'], stats['total_count'],
|
|
|
|
+ stats['less_than_2h_count'], stats['less_than_2h_ratio'],
|
|
|
|
+ stats['2h_to_3h_count'], stats['2h_to_3h_ratio'],
|
|
|
|
+ stats['3h_to_4h_count'], stats['3h_to_4h_ratio'],
|
|
|
|
+ stats['4h_to_8h_count'], stats['4h_to_8h_ratio'],
|
|
|
|
+ stats['more_than_8h_count'], stats['more_than_8h_ratio']
|
|
|
|
+ ))
|
|
|
|
+ mysql_conn.commit()
|
|
|
|
+
|
|
|
|
+ print("✅ 操作完成:")
|
|
|
|
+ print(f"1. Excel报表已生成(数据时效统计表.xlsx)")
|
|
|
|
+ print(f"2. 数据已插入MySQL(quality.data_timeliness_liantong)")
|
|
|
|
+ print(f"📊 统计时段:{hour_start.strftime('%H:%M')} - {now.strftime('%H:%M')}")
|
|
|
|
+ print(f"📝 统计结果:{stats}")
|
|
|
|
+
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(f"❌ 操作失败: {type(e).__name__}: {str(e)}")
|
|
|
|
+ finally:
|
|
|
|
+ mongo_client.close()
|
|
|
|
+ mysql_conn.close()
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ data_timeliness_analysis()
|