12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-04-07
- ---------
- @summary: 采集任务回填
- ---------
- @author: dzr
- """
- import time
- from bson.int64 import Int64
- from pymongo import MongoClient
- from pymongo.errors import DuplicateKeyError
- from log import logger
- MONGO_HOST = "172.17.4.87"
- MONGO_PORT = 27080
- client = MongoClient(MONGO_HOST, MONGO_PORT)
- MONGO_DB = "py_spider"
- mongodb = client[MONGO_DB]
- # 爬虫数据表
- listdata_err = mongodb["listdata_err"]
- def backfill_data():
- """回填重新采集的任务,并给出错误重试次数"""
- count = 0 # 计数器
- # 查询5天内范围内的错误重采次数小于20的错误任务
- start_at = int(time.time()) - 432000
- end_at = int(time.time())
- q = {
- "failed_times": {"$lt": 20},
- "create_at": {"$gte": start_at, "$lt": end_at}
- }
- cursor = listdata_err.find(q, no_cursor_timeout=True)
- try:
- for doc in cursor:
- count += 1
- if count % 100 == 0:
- logger.info(f"[BackFill]放入采集队列{count}条任务")
- coll_name = doc["origin"] # 数据来源的表名
- document = doc["origin_data"] # 原始数据
- document["failed_times"] = doc["failed_times"]
- # 清除错误记录字段
- for field in ["save", "pri", "failed", "error", "code"]:
- document.pop(field, "")
- if "comeintime" in document:
- document["comeintime"] = Int64(time.time())
- try:
- mongodb[coll_name].insert_one(document)
- listdata_err.delete_one({"_id": doc["_id"]})
- except DuplicateKeyError as e:
- logger.warning(f"[BackFill]重复键错误:{e.details.get('errmsg')}")
- listdata_err.delete_one({"_id": doc["_id"]})
- logger.info(f"[BackFill]共计放入采集队列{count}条任务")
- finally:
- client.close()
- logger.info("[BackFill]采集任务回填结束")
- if __name__ == '__main__':
- backfill_data()
|