backfill_task.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-07
  4. ---------
  5. @summary: 采集任务回填
  6. ---------
  7. @author: dzr
  8. """
  9. import time
  10. from bson.int64 import Int64
  11. from pymongo import MongoClient
  12. from pymongo.errors import DuplicateKeyError
  13. from log import logger
  14. # MONGO_HOST = "172.17.4.87"
  15. # MONGO_PORT = 27080
  16. MONGO_HOST = "127.0.0.1"
  17. MONGO_PORT = 27001
  18. client = MongoClient(MONGO_HOST, MONGO_PORT)
  19. MONGO_DB = "py_spider"
  20. mongodb = client[MONGO_DB]
  21. # 爬虫数据表
  22. listdata_err = mongodb["listdata_err"]
  23. def backfill_data():
  24. """回填重新采集的任务,并给出错误重试次数"""
  25. count = 0 # 计数器
  26. # 查询5天内范围内的错误重采次数小于20的错误任务
  27. start_at = int(time.time()) - 432000
  28. end_at = int(time.time())
  29. q = {
  30. "failed_times": {"$lt": 20},
  31. "create_at": {"$gte": start_at, "$lt": end_at}
  32. }
  33. cursor = listdata_err.find(q, no_cursor_timeout=True)
  34. try:
  35. for doc in cursor:
  36. count += 1
  37. if count % 100 == 0:
  38. logger.info(f"[BackFill]放入采集队列{count}条任务")
  39. coll_name = doc["origin"]
  40. document = doc["origin_data"]
  41. document["failed_times"] = doc["failed_times"]
  42. # 清除错误记录字段
  43. for field in ["save", "pri", "failed", "error", "code"]:
  44. document.pop(field, "")
  45. if "comeintime" in document:
  46. document["comeintime"] = Int64(time.time())
  47. try:
  48. mongodb[coll_name].insert_one(document)
  49. listdata_err.delete_one({"_id": doc["_id"]})
  50. except DuplicateKeyError as e:
  51. logger.warning(f"[BackFill]重复键错误:{e.details.get('errmsg')}")
  52. logger.info(f"[BackFill]共计放入采集队列{count}条任务")
  53. finally:
  54. client.close()
  55. logger.info("[BackFill]采集任务回填结束")
  56. if __name__ == '__main__':
  57. backfill_data()