3
0

backfill_task.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-07
  4. ---------
  5. @summary: 采集任务回填
  6. ---------
  7. @author: dzr
  8. """
  9. import time
  10. from bson.int64 import Int64
  11. from pymongo import MongoClient
  12. from pymongo.errors import DuplicateKeyError
  13. from log import logger
  14. MONGO_HOST = "172.17.4.87"
  15. MONGO_PORT = 27080
  16. client = MongoClient(MONGO_HOST, MONGO_PORT)
  17. MONGO_DB = "py_spider"
  18. mongodb = client[MONGO_DB]
  19. # 爬虫数据表
  20. listdata_err = mongodb["listdata_err"]
  21. def backfill_data():
  22. """回填重新采集的任务,并给出错误重试次数"""
  23. count = 0 # 计数器
  24. # 查询5天内范围内的错误重采次数小于20的错误任务
  25. start_at = int(time.time()) - 432000
  26. end_at = int(time.time())
  27. q = {
  28. "failed_times": {"$lt": 20},
  29. "create_at": {"$gte": start_at, "$lt": end_at}
  30. }
  31. cursor = listdata_err.find(q, no_cursor_timeout=True)
  32. try:
  33. for doc in cursor:
  34. count += 1
  35. if count % 100 == 0:
  36. logger.info(f"[BackFill]放入采集队列{count}条任务")
  37. coll_name = doc["origin"] # 数据来源的表名
  38. document = doc["origin_data"] # 原始数据
  39. document["failed_times"] = doc["failed_times"]
  40. # 清除错误记录字段
  41. for field in ["save", "pri", "failed", "error", "code"]:
  42. document.pop(field, "")
  43. if "comeintime" in document:
  44. document["comeintime"] = Int64(time.time())
  45. try:
  46. mongodb[coll_name].insert_one(document)
  47. listdata_err.delete_one({"_id": doc["_id"]})
  48. except DuplicateKeyError as e:
  49. logger.warning(f"[BackFill]重复键错误:{e.details.get('errmsg')}")
  50. listdata_err.delete_one({"_id": doc["_id"]})
  51. logger.info(f"[BackFill]共计放入采集队列{count}条任务")
  52. finally:
  53. client.close()
  54. logger.info("[BackFill]采集任务回填结束")
  55. if __name__ == '__main__':
  56. backfill_data()