import time from concurrent.futures import ThreadPoolExecutor, wait from typing import List from common.databases import insert_one from common.execptions import ValidatorError, HostsRetrieveError from common.log import logger from crawler.Task import Task from crawler.analysis import parse_urls from crawler.download import Downloader from crawler.retrieve import Validator from crawler.schedule import Scheduler from crawler.search_engine import JySearchEngine from crawler.utils import ( extract_base_url, extract_page_title, err_details, extract_domain ) from settings import ( MGO_REPETITION, MGO_RECORDS, SPECIAL_ENCODINGS, MGO_VISIT_KEYWORDS, MGO_VISIT_ORGANIZATION ) class BasicScheduler: def __init__(self, scheduler=None, **kwargs): self._scheduler = (scheduler or Scheduler()) class SearchEngine(BasicScheduler): def __init__(self, **kwargs): super(SearchEngine, self).__init__(scheduler=kwargs.get('scheduler')) self._engines = [] def set_search_engine(self, engine=None): if isinstance(engine, JySearchEngine): self._engines.append(engine) logger.info(f'[搜索引擎 - 添加成功]{engine.__class__.__name__}') return self def set_search_engines(self, engines): for engine in engines: self.set_search_engine(engine) return self def search_organizations(self, engine, items: List[dict]): logger.info(f'[搜索组织]共{len(items)}个') for item in items: name = item.get('name') if name in ['', None]: logger.warning(f'[组织搜索 - 异常]{item}') continue word = str(name).replace(' ', '').strip() logger.info(f"[搜索 - 组织]{engine.__class__.__name__} >>> {word}") urls = engine.search(word) lst = [Task(url=url, groups='organization') for url in urls] self._scheduler.insert_tasks(lst) MGO_VISIT_ORGANIZATION.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) def search_words(self, engine, words): logger.info(f'[搜索关键词]共{len(words)}个') for word in words: word = str(word).replace(' ', '').strip() logger.info(f"[搜索 - 关键词]{engine.__class__.__name__} >>> {word}") urls = engine.search(word) lst = [Task(url=url, groups='keyword') for url in urls] self._scheduler.insert_tasks(lst) def search_engines(self): with ThreadPoolExecutor(max_workers=2, thread_name_prefix='SearchEngine') as executor: '''组织单位''' search_organizations = [] projection = {'name': 1} _q = {'enable_added': {'$exists': False}} for item in MGO_VISIT_ORGANIZATION.find(_q, projection=projection): search_organizations.append(item) '''关键词''' search_words = [] for item in MGO_VISIT_KEYWORDS.find(projection=projection): search_words.append(item['name']) futures = [] for engine in self._engines: logger.info(f"[搜索引擎 - {engine.__class__.__name__}]启动成功") f = executor.submit(self.search_words, engine, search_words) f.add_done_callback(err_details) f = executor.submit(self.search_organizations, engine, search_organizations) f.add_done_callback(err_details) futures.append(f) wait(futures) class SearchDomain(BasicScheduler): def __init__( self, downloader=None, parser=None, validator=None, allow_load_filter=False, **kwargs, ): super(SearchDomain, self).__init__(scheduler=kwargs.get('scheduler')) self._downloader = (downloader or Downloader()) self._parser = (parser or parse_urls) self._validator = (validator or Validator()) if allow_load_filter: self._validator.load_filter() def verify(self, task): valid_words = self._validator.words(task['name'], task) if all([valid_words]): # 需求站点 insert_one(MGO_RECORDS, task) # 加入过滤器 self._validator.add_filter_feature(task['domain']) # 加入去重库 duplicate_site = {'url': task['domain'], 'time': int(time.time())} insert_one(MGO_REPETITION, duplicate_site) logger.info(f"[检索成功]{task['domain']} - {task['name']}") else: if any([task['sensitive'], task['duplication']]): raise ValidatorError(f"特征检验未通过:{task['name']}") def crawl_spider(self): while True: tasks = self._scheduler.get_task() if len(tasks) == 0: logger.info('关闭寻源爬虫') break task_key, task = tasks groups = task['groups'] domain = extract_domain(task['url']) allow_visit_domain = self._validator.url(domain) if not allow_visit_domain: continue logger.info(f'request web site -> {task["url"]}') response = self._downloader.get(task['url']) print(response, len(response.text)) if response.status_code != 200 or response.text in ['', None]: continue response.encoding = response.apparent_encoding if response.encoding in SPECIAL_ENCODINGS: response.encoding = 'utf-8' task['domain'] = domain base_url = extract_base_url(task['url']) task['base_url'] = base_url page_source = response.text title = extract_page_title(page_source) print(title) task['name'] = title try: self.verify(task) urls = self._parser(page_source, base_url) new_tasks = [Task(url=url, groups=groups) for url in urls] self._scheduler.insert_tasks(new_tasks) except HostsRetrieveError: pass