# -*- coding: utf-8 -*- """ Created on 2023-04-07 --------- @summary: 采集任务回填 --------- @author: dzr """ import time from bson.int64 import Int64 from pymongo import MongoClient from pymongo.errors import DuplicateKeyError from log import logger MONGO_HOST = "172.17.4.87" MONGO_PORT = 27080 client = MongoClient(MONGO_HOST, MONGO_PORT) MONGO_DB = "py_spider" mongodb = client[MONGO_DB] # 爬虫数据表 listdata_err = mongodb["listdata_err"] def backfill_data(): """回填重新采集的任务,并给出错误重试次数""" count = 0 # 计数器 # 查询5天内范围内的错误重采次数小于20的错误任务 start_at = int(time.time()) - 432000 end_at = int(time.time()) q = { "failed_times": {"$lt": 20}, "create_at": {"$gte": start_at, "$lt": end_at} } cursor = listdata_err.find(q, no_cursor_timeout=True) try: for doc in cursor: count += 1 if count % 100 == 0: logger.info(f"[BackFill]放入采集队列{count}条任务") coll_name = doc["origin"] # 数据来源的表名 document = doc["origin_data"] # 原始数据 document["failed_times"] = doc["failed_times"] # 清除错误记录字段 for field in ["save", "pri", "failed", "error", "code"]: document.pop(field, "") if "comeintime" in document: document["comeintime"] = Int64(time.time()) try: mongodb[coll_name].insert_one(document) listdata_err.delete_one({"_id": doc["_id"]}) except DuplicateKeyError as e: logger.warning(f"[BackFill]重复键错误:{e.details.get('errmsg')}") listdata_err.delete_one({"_id": doc["_id"]}) logger.info(f"[BackFill]共计放入采集队列{count}条任务") finally: client.close() logger.info("[BackFill]采集任务回填结束") if __name__ == '__main__': backfill_data()