123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-04-07
- ---------
- @summary: 采集任务回填
- ---------
- @author: dzr
- """
- import time
- from bson.int64 import Int64
- from pymongo import MongoClient
- from pymongo.errors import DuplicateKeyError
- from log import logger
- # MONGO_HOST = "172.17.4.87"
- # MONGO_PORT = 27080
- MONGO_HOST = "127.0.0.1"
- MONGO_PORT = 27001
- client = MongoClient(MONGO_HOST, MONGO_PORT)
- MONGO_DB = "py_spider"
- mongodb = client[MONGO_DB]
- # 爬虫数据表
- listdata_err = mongodb["listdata_err"]
- def backfill_data():
- """回填重新采集的任务,并给出错误重试次数"""
- count = 0 # 计数器
- # 查询5天内范围内的错误重采次数小于20的错误任务
- start_at = int(time.time()) - 432000
- end_at = int(time.time())
- q = {
- "failed_times": {"$lt": 20},
- "create_at": {"$gte": start_at, "$lt": end_at}
- }
- cursor = listdata_err.find(q, no_cursor_timeout=True)
- try:
- for doc in cursor:
- count += 1
- if count % 100 == 0:
- logger.info(f"[BackFill]放入采集队列{count}条任务")
- coll_name = doc["origin"]
- document = doc["origin_data"]
- document["failed_times"] = doc["failed_times"]
- # 清除错误记录字段
- for field in ["save", "pri", "failed", "error", "code"]:
- document.pop(field, "")
- if "comeintime" in document:
- document["comeintime"] = Int64(time.time())
- try:
- mongodb[coll_name].insert_one(document)
- listdata_err.delete_one({"_id": doc["_id"]})
- except DuplicateKeyError as e:
- logger.warning(f"[BackFill]重复键错误:{e.details.get('errmsg')}")
- logger.info(f"[BackFill]共计放入采集队列{count}条任务")
- finally:
- client.close()
- logger.info("[BackFill]采集任务回填结束")
- if __name__ == '__main__':
- backfill_data()
|