nmpa.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-04-24
  4. ---------
  5. @summary: 药监局任务管理
  6. ---------
  7. @author: Dzr
  8. """
  9. import threading
  10. from base_server import BaseServer, tools
  11. from common.log import logger
  12. redis_label = "NMPA" # redis任务标签
  13. class NMPAServer(BaseServer, threading.Thread):
  14. def __init__(self, site: str, db: str, table: str, maxsize=None):
  15. threading.Thread.__init__(self)
  16. super(NMPAServer, self).__init__(site, redis_label, db, table)
  17. self.label = f'{self.server}_{self.getName()}'
  18. self.maxsize = maxsize or 10
  19. self.sleep_interval = 5
  20. self.output_log = True
  21. def task_pool(self):
  22. query = {"status": False, "crawl_status": False}
  23. with self.mgo_db.find(query) as cursor:
  24. self.output_log = True
  25. for item in cursor:
  26. if "page" not in item:
  27. item['page'] = 1
  28. if self.task_total >= self.maxsize:
  29. break
  30. fingerprint = self.fingerprint(**item)
  31. if not self.exists(fingerprint):
  32. task_str = tools.json_dumps(item)
  33. self.rpush(task_str)
  34. self.add(fingerprint)
  35. if self.output_log:
  36. logger.info(f'[{self.label}]待采集任务 {self.task_total} 条')
  37. self.output_log = False
  38. def run(self):
  39. logger.info(f'[{self.label}]开始生产任务')
  40. while True:
  41. try:
  42. if self.task_total < 3:
  43. self.task_pool()
  44. tools.delay(self.sleep_interval)
  45. except Exception as e:
  46. logger.exception(e)
  47. class NMPAClient(BaseServer):
  48. def __init__(self, site: str, db: str, table: str):
  49. super(NMPAClient, self).__init__(site, redis_label, db, table)
  50. def get_crawl_task(self):
  51. task = tools.json_loads(self.lpop(), _id=tools.ObjectId)
  52. if task is not None:
  53. self.remove(self.fingerprint(**task))
  54. self.record_history(task)
  55. task = tools.document2dict(task)
  56. return task
  57. def record_history(self, task: dict):
  58. items: dict = self.mgo_db.find_one(task['_id'])
  59. record: list = items.get('crawl_record', list())
  60. record.append(task['page'])
  61. self.mgo_db.update_one(
  62. {'_id': task['_id']},
  63. {'$set': {'crawl_record': record}},
  64. upsert=True
  65. )
  66. def save_data(self, table, documents):
  67. try:
  68. inserted_total = self.upload_data_to_mongodb(documents, table=table)
  69. logger.info(f"[{self.server}]采集成功 {inserted_total} 条 --列表页")
  70. except TypeError:
  71. pass