nmpa.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-24
  4. ---------
  5. @summary: 药监局任务管理
  6. ---------
  7. @author: Dzr
  8. """
  9. import threading
  10. from base_server import BaseServer, tools
  11. from common.log import logger
  12. redis_label = "NMPA" # redis任务标签
  13. class NMPAServer(BaseServer, threading.Thread):
  14. def __init__(self, site: str, db: str, table: str, maxsize=None):
  15. threading.Thread.__init__(self)
  16. super(NMPAServer, self).__init__(site, redis_label, db, table)
  17. self.label = f'{self.server}_{self.getName()}'
  18. self.maxsize = maxsize or 10
  19. self.sleep_interval = 5
  20. self.output_log = True
  21. def task_pool(self):
  22. query = {"status": False, "crawl_status": False}
  23. with self.mgo_db.find(query) as cursor:
  24. self.output_log = True
  25. for item in cursor:
  26. if "page" not in item:
  27. item['page'] = 1
  28. if self.task_total >= self.maxsize:
  29. break
  30. fingerprint = self.fingerprint(**item)
  31. if not self.exists(fingerprint):
  32. task_str = tools.json_dumps(item)
  33. self.rpush(task_str)
  34. self.add(fingerprint)
  35. if self.output_log:
  36. logger.info(f'[{self.label}]待采集任务 {self.task_total} 条')
  37. self.output_log = False
  38. def run(self):
  39. logger.info(f'[{self.label}]开始生产任务')
  40. while True:
  41. if self.task_total < 3:
  42. self.task_pool()
  43. tools.delay(self.sleep_interval)
  44. class NMPAClient(BaseServer):
  45. def __init__(self, site: str, db: str, table: str):
  46. super(NMPAClient, self).__init__(site, redis_label, db, table)
  47. def get_crawl_task(self):
  48. task = tools.json_loads(self.lpop(), _id=tools.ObjectId)
  49. if task is not None:
  50. self.remove(self.fingerprint(**task))
  51. self.record_history(task)
  52. task = tools.document2dict(task)
  53. return task
  54. def record_history(self, task: dict):
  55. items: dict = self.mgo_db.find_one(task['_id'])
  56. record: list = items.get('crawl_record', list())
  57. record.append(task['page'])
  58. self.mgo_db.update_one(
  59. {'_id': task['_id']},
  60. {'$set': {'crawl_record': record}},
  61. upsert=True
  62. )
  63. def save_data(self, table, documents):
  64. try:
  65. inserted_total = self.upload_data_to_mongodb(documents, table=table)
  66. logger.info(f"[{self.server}]采集成功 {inserted_total} 条 --列表页")
  67. except TypeError:
  68. pass