import threading import time from typing import List, Mapping from common.databases import insert_one, int2long from common.execptions import ValidatorError, HostsRetrieveError from common.log import logger from common.tools import delay_by from crawler.Task import Task from crawler.analysis import parse_urls from crawler.download import Downloader from crawler.retrieve import Validator from crawler.schedule import Scheduler from crawler.search_engine import JySearchEngine from crawler.utils import ( extract_base_url, extract_page_title, extract_domain, valid_url ) from settings import ( SPECIAL_ENCODINGS, MGO_REPETITION, MGO_DOMAIN, MGO_SEED_URLS, MGO_SEED_ORGS, MGO_SEED_KEYWORDS, ) class BasicSearch: def __init__(self, scheduler=None, **kwargs): self._scheduler = (scheduler or Scheduler()) self.query = {'enable_added': {'$exists': False}} self.projection = {'name': 1} self.sort = [('_id', -1)] # 权重 self.org_weight = kwargs.get('org_weight', 9) self.url_weight = kwargs.get('url_weight', 8) self.keyword_weight = kwargs.get('keyword_weight', 5) # 分类 self.visit_classify = 'visit' self.query_classify = 'query' # 归属组 self.org_groups = 'organization' self.keyword_groups = 'keyword' self.url_groups = 'seed_url' self._init() def _init(self): self.sync_data() @staticmethod def loops_interval(label, interval): next_run_time = delay_by((interval or 300)) logger.info(f'执行:<{label}>,下次运行时间:{next_run_time}') time.sleep(interval) @staticmethod def make_task(**kwargs): """生成Task对象""" return Task(**kwargs) def seed_orgs(self) -> List[Mapping]: """组织|单位""" search_orgs = [] cursor = MGO_SEED_ORGS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_orgs.append(item) return search_orgs def seed_keywords(self): """关键词""" search_keywords = [] cursor = MGO_SEED_KEYWORDS.find(projection=self.projection) for item in cursor.sort(self.sort): search_keywords.append(item['name']) return search_keywords def seed_urls(self) -> List[Mapping]: """种子urls""" search_urls = [] cursor = MGO_SEED_URLS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_urls.append(item) return search_urls def sync_data_urls(self): """同步网址数据""" _interval = 7200 while True: items = self.seed_urls() lst = [] for item in items: if not valid_url(item['name']): items.remove(item) continue lst.append(self.make_task( url=item['name'], groups=self.url_groups, classify=self.visit_classify, weight=self.url_weight)) self._scheduler.insert_tasks(lst, level=self.url_weight) for item in items: MGO_SEED_URLS.update_many( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'同步更新{len(items)}条网址数据') self.loops_interval(self.sync_data_urls.__name__, _interval) def sync_data_keywords(self): """同步关键词数据""" _interval = 1800 while True: words = self.seed_keywords() # 处理关键词格式并推送到任务队列 words = [str(word).replace(' ', '').strip() for word in words] lst = [] for word in words: lst.append(self.make_task( search=word, groups=self.keyword_groups, classify=self.query_classify, weight=self.keyword_weight)) self._scheduler.insert_tasks(lst, level=self.keyword_weight) logger.info(f'同步更新{len(words)}条关键词数据') self.loops_interval(self.sync_data_keywords.__name__, _interval) def sync_data_orgs(self): """同步组织单位数据""" _interval = 3600 while True: items = self.seed_orgs() # 处理单位组织名称并推送到任务队列 orgs = [] for item in items: name = item.get('name') if name in ['', None]: logger.warning(f'[异常的单位组织]{item}') continue word = str(name).replace(' ', '').strip() orgs.append(word) lst = [] for word in orgs: lst.append(self.make_task( search=word, groups=self.org_groups, classify=self.query_classify, weight=self.org_weight)) self._scheduler.insert_tasks(lst, level=self.org_weight) # 已添加的组织单位名称进行标记,之后不在推送到任务队列 for item in items: MGO_SEED_ORGS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'同步更新{len(items)}个单位组织数据') self.loops_interval(self.sync_data_orgs.__name__, _interval) def sync_data(self): """同步数据""" logger.info(f'[数据寻源]开启数据同步') threading.Thread( target=self.sync_data_urls, name='LoadingSeedUrls' ).start() threading.Thread( target=self.sync_data_keywords, name='LoadingSeedKeyWords' ).start() threading.Thread( target=self.sync_data_orgs, name='LoadingSeedOrgs' ).start() class SearchEngine(BasicSearch): def __init__(self, **kwargs): super(SearchEngine, self).__init__(scheduler=kwargs.get('scheduler')) self._engines = [] def set_search_engine(self, engine=None): if isinstance(engine, JySearchEngine): self._engines.append(engine) logger.info(f'添加搜索引擎<{engine.__class__.__name__}>完成') return self def set_search_engines(self, engines): for engine in engines: self.set_search_engine(engine) return self def start_search(self, engine): while True: tasks = self._scheduler.get_task() if len(tasks) == 0: self.loops_interval(self.start_search.__name__, 5) task_key, task = tasks task['update_at'] = int2long(int(time.time())) if task['classify'] == self.visit_classify: self._scheduler.insert_task(task, level=task['weight']) else: word = task['search'] logger.info(f"<{engine.__class__.__name__}> {task['groups']} >>> {word}") urls = engine.search(word) lst = [] for url in urls: lst.append(self.make_task( url=url, groups=task['groups'], classify=self.visit_classify, weight=self.url_weight )) self._scheduler.insert_tasks(lst, level=self.url_weight) def search_engines(self): logger.info(f'[搜索引擎]初始化加载') for engine in self._engines: threading.Thread( target=self.start_search, name='SearchEngine', args=(engine,) ).start() class VisitDomain(BasicSearch): def __init__( self, downloader=None, parser=None, validator=None, allow_load_filter=False, **kwargs, ): super(VisitDomain, self).__init__(scheduler=kwargs.get('scheduler')) self._downloader = (downloader or Downloader()) self._parser = (parser or parse_urls) self._validator = (validator or Validator()) if allow_load_filter: self._validator.load_filter() def push_new_domain(self, task: Task): # 新源 insert_one(MGO_DOMAIN, task) # 加入过滤器 self._validator.add_filter_feature(task['domain']) # 加入去重库 remove_duplicate = {'url': task['domain'], 'time': task['update_at']} insert_one(MGO_REPETITION, remove_duplicate) logger.info(f"[录入新域]{task['domain']} - {task['name']}") def verify(self, task: Task): valid_words = self._validator.words(task['name'], task) if all([valid_words]): self.push_new_domain(task) else: if any([task['sensitive'], task['duplication']]): raise ValidatorError(f"特征检验未通过:{task['name']}") def search_domains(self): while True: tasks = self._scheduler.get_task() if len(tasks) == 0: logger.info('关闭寻源爬虫') break task_key, task = tasks task['update_at'] = int2long(int(time.time())) if task['classify'] != self.visit_classify: self._scheduler.insert_task(task, level=task['weight']) else: domain = extract_domain(task['url']) allow_visit_domain = self._validator.url(domain) if not allow_visit_domain: continue logger.info(f'request web site -> {task["url"]}') response = self._downloader.get(task['url']) if response.status_code != 200 or response.text in ['', None]: continue response.encoding = response.apparent_encoding if response.encoding in SPECIAL_ENCODINGS: response.encoding = 'utf-8' task['domain'] = domain base_url = extract_base_url(task['url']) task['base_url'] = base_url page_source = response.text title = extract_page_title(page_source) task['name'] = title try: self.verify(task) urls = self._parser(page_source, base_url) new_tasks = [] for url in urls: new_tasks.append(self.make_task( url=url, groups=task['groups'], classify=self.visit_classify, weight=task['weight'] )) self._scheduler.insert_tasks(new_tasks, level=self.url_weight) except HostsRetrieveError: pass