spider_statistics.py 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from pymongo import MongoClient
  2. from datetime import datetime, timedelta
  3. # 连接到MongoDB
  4. client = MongoClient("mongodb://192.168.3.149:27180/")
  5. db = client['data_quality']
  6. collection = db['bid_analysis']
  7. # 定义两个任意时间段的边界
  8. # 时间段1
  9. period1_start = int(datetime(2025, 1, 20, 0, 0, 0).timestamp()) # 2025-01-20 00:00:00
  10. period1_end = int(datetime(2025, 1, 20, 23, 59, 59).timestamp()) # 2025-01-20 23:59:59
  11. # 时间段2
  12. period2_start = int(datetime(2025, 1, 21, 0, 0, 0).timestamp()) # 2025-01-21 00:00:00
  13. period2_end = int(datetime(2025, 1, 21, 23, 59, 59).timestamp()) # 2025-01-21 23:59:59
  14. # 聚合查询:计算两个时间段的数量和波动率
  15. pipeline = [
  16. {
  17. "$project": {
  18. "spidercode": 1,
  19. "create_time": 1,
  20. "date": {"$toDate": {"$multiply": ["$create_time", 1000]}} # 转换为毫秒级日期
  21. }
  22. },
  23. {
  24. "$group": {
  25. "_id": "$spidercode",
  26. "total_count": {"$sum": 1},
  27. "period1_count": {
  28. "$sum": {
  29. "$cond": [
  30. {"$and": [
  31. {"$gte": ["$create_time", period1_start]},
  32. {"$lte": ["$create_time", period1_end]}
  33. ]},
  34. 1,
  35. 0
  36. ]
  37. }
  38. },
  39. "period2_count": {
  40. "$sum": {
  41. "$cond": [
  42. {"$and": [
  43. {"$gte": ["$create_time", period2_start]},
  44. {"$lte": ["$create_time", period2_end]}
  45. ]},
  46. 1,
  47. 0
  48. ]
  49. }
  50. }
  51. }
  52. },
  53. {
  54. "$project": {
  55. "spidercode": "$_id",
  56. "total_count": 1,
  57. "period1_count": 1,
  58. "period2_count": 1,
  59. "volatility": {
  60. "$cond": [
  61. {"$eq": ["$period1_count", 0]},
  62. None, # 如果 Period1 Count 为 0,则波动率为 None
  63. {
  64. "$divide": [
  65. {"$subtract": ["$period2_count", "$period1_count"]},
  66. "$period1_count"
  67. ]
  68. }
  69. ]
  70. }
  71. }
  72. }
  73. ]
  74. # 执行聚合查询
  75. result = collection.aggregate(pipeline)
  76. # 打印结果
  77. for doc in result:
  78. print(doc)
  79. '''
  80. 示例输出:
  81. {
  82. "spidercode": "spider1",
  83. "total_count": 200,
  84. "period1_count": 60,
  85. "period2_count": 70,
  86. "volatility": 0.1667
  87. }
  88. '''