import time from concurrent.futures import ThreadPoolExecutor, wait from common.databases import insert_one from common.execptions import ValidatorError, HostsRetrieveError from common.log import logger from crawler.Task import Task from crawler.analysis import parse_urls from crawler.download import Downloader from crawler.retrieve import Validator from crawler.schedule import Scheduler from crawler.search_engine import JySearchEngine from crawler.utils import ( extract_base_url, extract_page_title, err_details, extract_domain ) from settings import MGO_REPETITION, MGO_RECORDS, MGO_VISIT class BreadthCrawler: def __init__( self, downloader=None, parser=None, scheduler=None, validator=None, workers=1, allow_load_filter=False ): self._workers = workers self._scheduler = (scheduler or Scheduler()) self._downloader = (downloader or Downloader()) self._parser = (parser or parse_urls) self._validator = (validator or Validator()) self._engines = [] if allow_load_filter: self._validator.load_filter() def verify(self, task): valid_words = self._validator.words(task['name'], task) if all([valid_words]): # 需求站点 insert_one(MGO_RECORDS, task) # 加入过滤器 self._validator.add_filter_feature(task['domain']) # 加入去重库 duplicate_site = {'url': task['domain'], 'time': int(time.time())} insert_one(MGO_REPETITION, duplicate_site) logger.info(f"[获取新源]{task['domain']} - {task['name']}") else: if any([task['sensitive'], task['duplication']]): raise ValidatorError(f"特征检验未通过:{task['name']}") def crawl_spider(self): while True: tasks = self._scheduler.get_task() if len(tasks) == 0: print("关闭寻源爬虫") break # TODO 层级管理如何体现在爬虫采集? task_key, task = tasks domain = extract_domain(task['url']) visit_domain = self._validator.url(domain) if not visit_domain: continue logger.info(f'准备访问:{domain}') response = self._downloader.get(task['url']) if response.status_code != 200 or response.text in ['']: continue page_source = response.text task['domain'] = domain base_url = extract_base_url(task['url']) task['base_url'] = base_url title = extract_page_title(page_source) task['name'] = title try: self.verify(task) urls = self._parser(page_source, base_url) new_tasks = [Task(url=url) for url in urls] self._scheduler.insert_tasks(new_tasks) except HostsRetrieveError: pass def set_search_engine(self, engine=None): if isinstance(engine, JySearchEngine): self._engines.append(engine) logger.info(f'[搜索引擎 - {engine.__class__.__name__}] 添加成功') return self def set_search_engines(self, engines): for engine in engines: self.set_search_engine(engine) return self def search_words(self, engine, words): for word in words: logger.info(f"[搜索引擎 - {engine.__class__.__name__}]搜索:{word}") urls = engine.search(word) lst = [Task(url=url) for url in urls] self._scheduler.insert_tasks(lst) def enable_search_engines(self): with ThreadPoolExecutor(max_workers=2, thread_name_prefix='SearchEngine') as executor: search_words = [] for item in MGO_VISIT.find(): search_words.append(item['name']) futures = [] for engine in self._engines: logger.info(f"[搜索引擎 - {engine.__class__.__name__}] 启动成功") f = executor.submit(self.search_words, engine, search_words) f.add_done_callback(err_details) futures.append(f) wait(futures) def start(self): with ThreadPoolExecutor(max_workers=self._workers) as executor: futures = [] f = executor.submit(self.enable_search_engines) f.add_done_callback(err_details) futures.append(f) for _ in range(1, self._workers + 1): future = executor.submit(self.crawl_spider) future.add_done_callback(err_details) futures.append(future) wait(futures) print('寻源任务结束')