123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- import time
- from concurrent.futures import ThreadPoolExecutor, wait
- from typing import List
- from common.databases import insert_one
- from common.execptions import ValidatorError, HostsRetrieveError
- from common.log import logger
- from crawler.Task import Task
- from crawler.analysis import parse_urls
- from crawler.download import Downloader
- from crawler.retrieve import Validator
- from crawler.schedule import Scheduler
- from crawler.search_engine import JySearchEngine
- from crawler.utils import (
- extract_base_url,
- extract_page_title,
- err_details,
- extract_domain
- )
- from settings import (
- MGO_REPETITION,
- MGO_RECORDS,
- SPECIAL_ENCODINGS,
- MGO_VISIT_KEYWORDS,
- MGO_VISIT_ORGANIZATION
- )
- class BasicScheduler:
- def __init__(self, scheduler=None, **kwargs):
- self._scheduler = (scheduler or Scheduler())
- class SearchEngine(BasicScheduler):
- def __init__(self, **kwargs):
- super(SearchEngine, self).__init__(scheduler=kwargs.get('scheduler'))
- self._engines = []
- def set_search_engine(self, engine=None):
- if isinstance(engine, JySearchEngine):
- self._engines.append(engine)
- logger.info(f'[搜索引擎 - 添加成功]{engine.__class__.__name__}')
- return self
- def set_search_engines(self, engines):
- for engine in engines:
- self.set_search_engine(engine)
- return self
- def search_organizations(self, engine, items: List[dict]):
- logger.info(f'[搜索组织]共{len(items)}个')
- for item in items:
- name = item.get('name')
- if name in ['', None]:
- logger.warning(f'[组织搜索 - 异常]{item}')
- continue
- word = str(name).replace(' ', '').strip()
- logger.info(f"[搜索 - 组织]{engine.__class__.__name__} >>> {word}")
- urls = engine.search(word)
- lst = [Task(url=url, groups='organization') for url in urls]
- self._scheduler.insert_tasks(lst)
- MGO_VISIT_ORGANIZATION.update_one(
- {'_id': item['_id']},
- {'$set': {'enable_added': True}}
- )
- def search_words(self, engine, words):
- logger.info(f'[搜索关键词]共{len(words)}个')
- for word in words:
- word = str(word).replace(' ', '').strip()
- logger.info(f"[搜索 - 关键词]{engine.__class__.__name__} >>> {word}")
- urls = engine.search(word)
- lst = [Task(url=url, groups='keyword') for url in urls]
- self._scheduler.insert_tasks(lst)
- def search_engines(self):
- with ThreadPoolExecutor(max_workers=2, thread_name_prefix='SearchEngine') as executor:
- '''组织单位'''
- search_organizations = []
- projection = {'name': 1}
- _q = {'enable_added': {'$exists': False}}
- for item in MGO_VISIT_ORGANIZATION.find(_q, projection=projection):
- search_organizations.append(item)
- '''关键词'''
- search_words = []
- for item in MGO_VISIT_KEYWORDS.find(projection=projection):
- search_words.append(item['name'])
- futures = []
- for engine in self._engines:
- logger.info(f"[搜索引擎 - {engine.__class__.__name__}]启动成功")
- f = executor.submit(self.search_words, engine, search_words)
- f.add_done_callback(err_details)
- f = executor.submit(self.search_organizations, engine, search_organizations)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
- class SearchDomain(BasicScheduler):
- def __init__(
- self,
- downloader=None,
- parser=None,
- validator=None,
- allow_load_filter=False,
- **kwargs,
- ):
- super(SearchDomain, self).__init__(scheduler=kwargs.get('scheduler'))
- self._downloader = (downloader or Downloader())
- self._parser = (parser or parse_urls)
- self._validator = (validator or Validator())
- if allow_load_filter:
- self._validator.load_filter()
- def verify(self, task):
- valid_words = self._validator.words(task['name'], task)
- if all([valid_words]):
- # 需求站点
- insert_one(MGO_RECORDS, task)
- # 加入过滤器
- self._validator.add_filter_feature(task['domain'])
- # 加入去重库
- duplicate_site = {'url': task['domain'], 'time': int(time.time())}
- insert_one(MGO_REPETITION, duplicate_site)
- logger.info(f"[检索成功]{task['domain']} - {task['name']}")
- else:
- if any([task['sensitive'], task['duplication']]):
- raise ValidatorError(f"特征检验未通过:{task['name']}")
- def crawl_spider(self):
- while True:
- tasks = self._scheduler.get_task()
- if len(tasks) == 0:
- logger.info('关闭寻源爬虫')
- break
- task_key, task = tasks
- groups = task['groups']
- domain = extract_domain(task['url'])
- allow_visit_domain = self._validator.url(domain)
- if not allow_visit_domain:
- continue
- logger.info(f'request web site -> {task["url"]}')
- response = self._downloader.get(task['url'])
- print(response, len(response.text))
- if response.status_code != 200 or response.text in ['', None]:
- continue
- response.encoding = response.apparent_encoding
- if response.encoding in SPECIAL_ENCODINGS:
- response.encoding = 'utf-8'
- task['domain'] = domain
- base_url = extract_base_url(task['url'])
- task['base_url'] = base_url
- page_source = response.text
- title = extract_page_title(page_source)
- print(title)
- task['name'] = title
- try:
- self.verify(task)
- urls = self._parser(page_source, base_url)
- new_tasks = [Task(url=url, groups=groups) for url in urls]
- self._scheduler.insert_tasks(new_tasks)
- except HostsRetrieveError:
- pass
|