__init__.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import time
  2. from concurrent.futures import ThreadPoolExecutor, wait
  3. from common.databases import insert_one
  4. from common.execptions import ValidatorError, HostsRetrieveError
  5. from common.log import logger
  6. from crawler.Task import Task
  7. from crawler.analysis import parse_urls
  8. from crawler.download import Downloader
  9. from crawler.retrieve import Validator
  10. from crawler.schedule import Scheduler
  11. from crawler.search_engine import JySearchEngine
  12. from crawler.utils import (
  13. extract_base_url,
  14. extract_page_title,
  15. err_details,
  16. extract_domain
  17. )
  18. from settings import MGO_REPETITION, MGO_RECORDS, MGO_VISIT
  19. class BreadthCrawler:
  20. def __init__(
  21. self,
  22. downloader=None,
  23. parser=None,
  24. scheduler=None,
  25. validator=None,
  26. workers=1,
  27. allow_load_filter=False
  28. ):
  29. self._workers = workers
  30. self._scheduler = (scheduler or Scheduler())
  31. self._downloader = (downloader or Downloader())
  32. self._parser = (parser or parse_urls)
  33. self._validator = (validator or Validator())
  34. self._engines = []
  35. if allow_load_filter:
  36. self._validator.load_filter()
  37. def verify(self, task):
  38. valid_words = self._validator.words(task['name'], task)
  39. if all([valid_words]):
  40. # 需求站点
  41. insert_one(MGO_RECORDS, task)
  42. # 加入过滤器
  43. self._validator.add_filter_feature(task['domain'])
  44. # 加入去重库
  45. duplicate_site = {'url': task['domain'], 'time': int(time.time())}
  46. insert_one(MGO_REPETITION, duplicate_site)
  47. logger.info(f"[获取新源]{task['domain']} - {task['name']}")
  48. else:
  49. if any([task['sensitive'], task['duplication']]):
  50. raise ValidatorError(f"特征检验未通过:{task['name']}")
  51. def crawl_spider(self):
  52. while True:
  53. tasks = self._scheduler.get_task()
  54. if len(tasks) == 0:
  55. print("关闭寻源爬虫")
  56. break
  57. # TODO 层级管理如何体现在爬虫采集?
  58. task_key, task = tasks
  59. domain = extract_domain(task['url'])
  60. visit_domain = self._validator.url(domain)
  61. if not visit_domain:
  62. continue
  63. logger.info(f'准备访问:{domain}')
  64. response = self._downloader.get(task['url'])
  65. if response.status_code != 200 or response.text in ['']:
  66. continue
  67. page_source = response.text
  68. task['domain'] = domain
  69. base_url = extract_base_url(task['url'])
  70. task['base_url'] = base_url
  71. title = extract_page_title(page_source)
  72. task['name'] = title
  73. try:
  74. self.verify(task)
  75. urls = self._parser(page_source, base_url)
  76. new_tasks = [Task(url=url) for url in urls]
  77. self._scheduler.insert_tasks(new_tasks)
  78. except HostsRetrieveError:
  79. pass
  80. def set_search_engine(self, engine=None):
  81. if isinstance(engine, JySearchEngine):
  82. self._engines.append(engine)
  83. logger.info(f'[搜索引擎 - {engine.__class__.__name__}] 添加成功')
  84. return self
  85. def set_search_engines(self, engines):
  86. for engine in engines:
  87. self.set_search_engine(engine)
  88. return self
  89. def search_words(self, engine, words):
  90. for word in words:
  91. logger.info(f"[搜索引擎 - {engine.__class__.__name__}]搜索:{word}")
  92. urls = engine.search(word)
  93. lst = [Task(url=url) for url in urls]
  94. self._scheduler.insert_tasks(lst)
  95. def enable_search_engines(self):
  96. with ThreadPoolExecutor(max_workers=2, thread_name_prefix='SearchEngine') as executor:
  97. search_words = []
  98. for item in MGO_VISIT.find():
  99. search_words.append(item['name'])
  100. futures = []
  101. for engine in self._engines:
  102. logger.info(f"[搜索引擎 - {engine.__class__.__name__}] 启动成功")
  103. f = executor.submit(self.search_words, engine, search_words)
  104. f.add_done_callback(err_details)
  105. futures.append(f)
  106. wait(futures)
  107. def start(self):
  108. with ThreadPoolExecutor(max_workers=self._workers) as executor:
  109. futures = []
  110. f = executor.submit(self.enable_search_engines)
  111. f.add_done_callback(err_details)
  112. futures.append(f)
  113. for _ in range(1, self._workers + 1):
  114. future = executor.submit(self.crawl_spider)
  115. future.add_done_callback(err_details)
  116. futures.append(future)
  117. wait(futures)
  118. print('寻源任务结束')