# -*- coding: utf-8 -*- """ Created on 2023-04-24 --------- @summary: 药监局任务管理 --------- @author: Dzr """ import threading from base_server import BaseServer, tools from common.log import logger redis_label = "NMPA" # redis任务标签 class NMPAServer(BaseServer, threading.Thread): def __init__(self, site: str, db: str, table: str, maxsize=None): threading.Thread.__init__(self) super(NMPAServer, self).__init__(site, redis_label, db, table) self.label = f'{self.server}_{self.getName()}' self.maxsize = maxsize or 10 self.sleep_interval = 5 self.output_log = True def task_pool(self): query = {"status": False, "crawl_status": False} with self.mgo_db.find(query) as cursor: self.output_log = True for item in cursor: if "page" not in item: item['page'] = 1 if self.task_total >= self.maxsize: break fingerprint = self.fingerprint(**item) if not self.exists(fingerprint): task_str = tools.json_dumps(item) self.rpush(task_str) self.add(fingerprint) if self.output_log: logger.info(f'[{self.label}]待采集任务 {self.task_total} 条') self.output_log = False def run(self): logger.info(f'[{self.label}]开始生产任务') while True: try: if self.task_total < 3: self.task_pool() tools.delay(self.sleep_interval) except Exception as e: logger.exception(e) class NMPAClient(BaseServer): def __init__(self, site: str, db: str, table: str): super(NMPAClient, self).__init__(site, redis_label, db, table) def get_crawl_task(self): task = tools.json_loads(self.lpop(), _id=tools.ObjectId) if task is not None: self.remove(self.fingerprint(**task)) self.record_history(task) task = tools.document2dict(task) return task def record_history(self, task: dict): items: dict = self.mgo_db.find_one(task['_id']) record: list = items.get('crawl_record', list()) record.append(task['page']) self.mgo_db.update_one( {'_id': task['_id']}, {'$set': {'crawl_record': record}}, upsert=True ) def save_data(self, table, documents): try: inserted_total = self.upload_data_to_mongodb(documents, table=table) logger.info(f"[{self.server}]采集成功 {inserted_total} 条 --列表页") except TypeError: pass