12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- # -*- coding: utf-8 -*-
- """
- Created on 2023-04-24
- ---------
- @summary: 药监局任务管理
- ---------
- @author: Dzr
- """
- import threading
- from base_server import BaseServer, tools
- from common.log import logger
- redis_label = "NMPA" # redis任务标签
- class NMPAServer(BaseServer, threading.Thread):
- def __init__(self, site: str, db: str, table: str, maxsize=None):
- threading.Thread.__init__(self)
- super(NMPAServer, self).__init__(site, redis_label, db, table)
- self.label = f'{self.server}_{self.getName()}'
- self.maxsize = maxsize or 10
- self.sleep_interval = 5
- self.output_log = True
- def task_pool(self):
- query = {"status": False, "crawl_status": False}
- with self.mgo_db.find(query) as cursor:
- self.output_log = True
- for item in cursor:
- if "page" not in item:
- item['page'] = 1
- if self.task_total >= self.maxsize:
- break
- fingerprint = self.fingerprint(**item)
- if not self.exists(fingerprint):
- task_str = tools.json_dumps(item)
- self.rpush(task_str)
- self.add(fingerprint)
- if self.output_log:
- logger.info(f'[{self.label}]待采集任务 {self.task_total} 条')
- self.output_log = False
- def run(self):
- logger.info(f'[{self.label}]开始生产任务')
- while True:
- try:
- if self.task_total < 3:
- self.task_pool()
- tools.delay(self.sleep_interval)
- except Exception as e:
- logger.exception(e)
- class NMPAClient(BaseServer):
- def __init__(self, site: str, db: str, table: str):
- super(NMPAClient, self).__init__(site, redis_label, db, table)
- def get_crawl_task(self):
- task = tools.json_loads(self.lpop(), _id=tools.ObjectId)
- if task is not None:
- self.remove(self.fingerprint(**task))
- self.record_history(task)
- task = tools.document2dict(task)
- return task
- def record_history(self, task: dict):
- items: dict = self.mgo_db.find_one(task['_id'])
- record: list = items.get('crawl_record', list())
- record.append(task['page'])
- self.mgo_db.update_one(
- {'_id': task['_id']},
- {'$set': {'crawl_record': record}},
- upsert=True
- )
- def save_data(self, table, documents):
- try:
- inserted_total = self.upload_data_to_mongodb(documents, table=table)
- logger.info(f"[{self.server}]采集成功 {inserted_total} 条 --列表页")
- except TypeError:
- pass
|