DataExport_forTesting.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. from pymongo import MongoClient
  2. from datetime import datetime, timedelta
  3. import pandas as pd
  4. import openpyxl
  5. # 数据入库量及数据监控时效 导出execl
  6. # MongoDB连接配置
  7. host = '172.20.45.129'
  8. port = 27002
  9. dbname = 'data_quality'
  10. collection_name = 'statistics'
  11. # 创建MongoDB连接
  12. client = MongoClient(host, port)
  13. db = client[dbname]
  14. collection = db[collection_name]
  15. # 获取当前时间和一周前的时间
  16. end_time = datetime.now()
  17. start_time = end_time - timedelta(weeks=1)
  18. # 将datetime转换为Unix时间戳(整数类型,去掉小数部分)
  19. start_timestamp = int(start_time.timestamp())
  20. end_timestamp = int(end_time.timestamp())
  21. # 输出调试信息:检查开始时间和结束时间
  22. print("Start time:", start_time)
  23. print("End time:", end_time)
  24. print("Start timestamp:", start_timestamp)
  25. print("End timestamp:", end_timestamp)
  26. # ----------------- 第一个Sheet: 断流监控_mongo库 -------------------
  27. # 查询过去一周的数据(断流监控_mongo库)
  28. pipeline_mongo = [
  29. {
  30. "$match": {
  31. "$or": [
  32. {"bidding.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
  33. {"bidding_ai.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
  34. {"connections.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
  35. {"nzj.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
  36. {"medical.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
  37. {"bidding_fragment.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}
  38. ]
  39. }
  40. },
  41. {
  42. "$limit": 5 # 限制查询返回的结果为前5条数据,便于调试
  43. }
  44. ]
  45. # 获取符合条件的数据
  46. data_mongo = list(collection.aggregate(pipeline_mongo))
  47. # 初始化MongoDB字段统计数据
  48. bidding_count = 0
  49. bidding_ai_count = 0
  50. connections_count = 0
  51. nzj_count = 0
  52. medical_count = 0
  53. bidding_fragment_data = {
  54. "情报_法务": 0,
  55. "情报_财务审计": 0,
  56. "情报_招标代理": 0,
  57. "情报_管理咨询": 0,
  58. "情报_保险": 0,
  59. "情报_工程设计咨询": 0,
  60. "情报_安防": 0,
  61. "情报_印务商机": 0,
  62. "情报_环境采购": 0,
  63. "情报_家具招投标": 0
  64. }
  65. # 统计MongoDB数据
  66. for doc in data_mongo:
  67. if 'bidding' in doc:
  68. bidding_count += doc['bidding'].get('count', 0)
  69. if 'bidding_ai' in doc:
  70. bidding_ai_count += doc['bidding_ai'].get('count', 0)
  71. if 'connections' in doc:
  72. connections_count += doc['connections'].get('count', 0)
  73. if 'nzj' in doc:
  74. nzj_count += doc['nzj'].get('count', 0)
  75. if 'medical' in doc :
  76. medical_count += doc['medical'].get('count', 0)
  77. if 'bidding_fragment' in doc:
  78. for key, value in doc['bidding_fragment'].get('count', {}).items():
  79. if key in bidding_fragment_data:
  80. bidding_fragment_data[key] += value
  81. # ----------------- 第二个Sheet: 断流监控—es -------------------
  82. # 查询过去一周的数据(断流监控—es)
  83. pipeline_es = [
  84. {
  85. "$match": {
  86. "$or": [
  87. {"es_bidding.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
  88. {"es_bidding_ai.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
  89. {"es_nzj.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
  90. {"medical_es.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}},
  91. {"es_bidding_fragment.timestamp": {"$gte": start_timestamp, "$lt": end_timestamp}}
  92. ]
  93. }
  94. },
  95. {
  96. "$limit": 5 # 限制查询返回的结果为前5条数据,便于调试
  97. }
  98. ]
  99. # 获取符合条件的数据
  100. data_es = list(collection.aggregate(pipeline_es))
  101. # 初始化ES字段统计数据
  102. es_bidding_count = 0
  103. es_bidding_ai_count = 0
  104. es_nzj_count = 0
  105. es_medical_count = 0
  106. es_bidding_fragment_data = {
  107. "情报_法务": 0,
  108. "情报_财务审计": 0,
  109. "情报_招标代理": 0,
  110. "情报_管理咨询": 0,
  111. "情报_保险": 0,
  112. "情报_工程设计咨询": 0,
  113. "情报_安防": 0,
  114. "情报_印务商机": 0,
  115. "情报_环境采购": 0,
  116. "情报_家具招投标": 0
  117. }
  118. # 统计ES数据
  119. for doc in data_es:
  120. if 'es_bidding' in doc:
  121. es_bidding_count += doc['es_bidding'].get('count', 0)
  122. if 'es_bidding_ai' in doc:
  123. es_bidding_ai_count += doc['es_bidding_ai'].get('count', 0)
  124. if 'es_nzj' in doc:
  125. es_nzj_count += doc['es_nzj'].get('count', 0)
  126. if 'es_medical_count' in doc:
  127. es_medical_count += doc['es_medical'].get('count', 0)
  128. if 'es_bidding_fragment' in doc:
  129. for key, value in doc['es_bidding_fragment'].get('count', {}).items():
  130. if key in es_bidding_fragment_data:
  131. es_bidding_fragment_data[key] += value
  132. # ----------------- 第三个Sheet: 数据时效监控 -------------------
  133. # 查询过去一周的数据(数据时效监控)
  134. pipeline_timeliness = [
  135. {
  136. "$match": {
  137. "data_timeliness.timestamp": {
  138. "$gte": start_timestamp, # 使用整数Unix时间戳
  139. "$lt": end_timestamp # 使用整数Unix时间戳
  140. }
  141. }
  142. },
  143. {
  144. "$limit": 5 # 限制查询返回的结果为前5条数据,便于调试
  145. }
  146. ]
  147. # 获取符合条件的数据
  148. data_timeliness = list(collection.aggregate(pipeline_timeliness))
  149. # 初始化字段统计数据
  150. timeliness_data = {
  151. "[0,5)分钟": 0,
  152. "[5,15)分钟": 0,
  153. "[15,30)分钟": 0,
  154. "[30,60)分钟": 0,
  155. "[1,3)小时": 0,
  156. "[3,7)小时": 0,
  157. "[7,15)小时": 0,
  158. "[15,24)小时": 0,
  159. "[1,2)天": 0,
  160. "[2,3)天": 0,
  161. "3天+": 0
  162. }
  163. # 统计数据
  164. for doc in data_timeliness:
  165. if 'data_timeliness' in doc:
  166. count_data = doc['data_timeliness'].get('count', {})
  167. timeliness_data["[0,5)分钟"] += float(count_data.get("a1", "0%").replace('%', ''))
  168. timeliness_data["[5,15)分钟"] += float(count_data.get("a2", "0%").replace('%', ''))
  169. timeliness_data["[15,30)分钟"] += float(count_data.get("a3", "0%").replace('%', ''))
  170. timeliness_data["[30,60)分钟"] += float(count_data.get("a4", "0%").replace('%', ''))
  171. timeliness_data["[1,3)小时"] += float(count_data.get("a5", "0%").replace('%', ''))
  172. timeliness_data["[3,7)小时"] += float(count_data.get("a6", "0%").replace('%', ''))
  173. timeliness_data["[7,15)小时"] += float(count_data.get("a7", "0%").replace('%', ''))
  174. timeliness_data["[15,24)小时"] += float(count_data.get("a8", "0%").replace('%', ''))
  175. timeliness_data["[1,2)天"] += float(count_data.get("a9", "0%").replace('%', ''))
  176. timeliness_data["[2,3)天"] += float(count_data.get("a10", "0%").replace('%', ''))
  177. timeliness_data["3天+"] += float(count_data.get("a11", "0%").replace('%', ''))
  178. # 获取当前时间的一周时间范围字符串
  179. date_range = f"{start_time.strftime('%Y/%m/%d')}-{end_time.strftime('%Y/%m/%d')}"
  180. # 构建Excel数据
  181. columns = ['日期', '标讯每周入库数据量', '高质量库每周入库数据量', '人脉管理数据', '拟在建数据量(全国)','医械通'] + list(bidding_fragment_data.keys())
  182. data_row_mongo = [date_range, bidding_count, bidding_ai_count, connections_count, nzj_count,medical_count] + list(bidding_fragment_data.values())
  183. columns_es = ['日期', '标讯每周入库数据量', '高质量库每周数据入库量', '拟在建数据量(全国)','医械通'] + list(es_bidding_fragment_data.keys())
  184. data_row_es = [date_range, es_bidding_count, es_bidding_ai_count, es_nzj_count,es_medical_count] + list(es_bidding_fragment_data.values())
  185. columns_timeliness = ['日期'] + list(timeliness_data.keys())
  186. data_row_timeliness = [date_range] + list(timeliness_data.values())
  187. # 创建DataFrame并写入Excel
  188. excel_file = '../周报表格导出/mongo_data_statistics_combined1.xlsx'
  189. with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
  190. # 写入第一个sheet(断流监控_mongo库)
  191. df_mongo = pd.DataFrame([data_row_mongo], columns=columns)
  192. df_mongo.to_excel(writer, sheet_name='入库数据量监控-mongo(每周)', index=False)
  193. # 写入第二个sheet(断流监控—es)
  194. df_es = pd.DataFrame([data_row_es], columns=columns_es)
  195. df_es.to_excel(writer, sheet_name='入库量数据量监控-es(每周)', index=False)
  196. # 将timeliness_data中的值转换为百分比字符串
  197. for key in timeliness_data:
  198. timeliness_data[key] = f"{timeliness_data[key]:.2f}%"
  199. # 构建数据行
  200. data_row_timeliness = [date_range] + list(timeliness_data.values())
  201. # 写入第三个sheet(数据时效监控)
  202. df_timeliness = pd.DataFrame([data_row_timeliness], columns=columns_timeliness)
  203. df_timeliness.to_excel(writer, sheet_name='数据时效监控(7天平均值)', index=False)
  204. print(f"统计结果已写入Excel文件: {excel_file}")