import threading import time from concurrent.futures import ThreadPoolExecutor, wait from typing import List, Mapping from common.databases import insert_one, int2long from common.execptions import ValidatorError, HostsRetrieveError from common.log import logger from common.tools import delay_by from crawler.Task import Task from crawler.analysis import parse_urls from crawler.download import Downloader from crawler.retrieve import Validator from crawler.schedule import Scheduler from crawler.search_engine import JySearchEngine from crawler.utils import ( extract_base_url, extract_page_title, extract_domain, is_url, err_details ) from settings import ( REDIS_KEY, SPECIAL_ENCODINGS, MGO_LUA_SPIDERS, MGO_DOMAIN, MGO_URLS, MGO_ORGS, MGO_KEYWORDS, MGO_REMOVAL_DUPLICATE ) class BasicSearch: def __init__( self, scheduler=None, validator=None, downloader=None, parser=None, org_weight=9, url_weight=8, keyword_weight=7, **kwargs ): self.scheduler = (scheduler or Scheduler()) self.validator = (validator or Validator()) self.downloader = (downloader or Downloader()) self.parser = (parser or parse_urls) # mongo查询 self.query = {'enable_added': {'$exists': False}} self.projection = {'name': 1} self.sort = [('_id', -1)] # 分类 self.visit_classify = 'visit' self.query_classify = 'query' # 权重 self.org_weight = org_weight self.url_weight = url_weight self.keyword_weight = keyword_weight # 归属组 self.org_groups = 'organization' self.keyword_groups = 'keyword' self.url_groups = 'seed_url' self.competing_groups = 'competing_goods' @staticmethod def loops_interval(label, interval): next_run_time = delay_by((interval or 300)) logger.info(f'执行:<{label}>,下次运行时间:{next_run_time}') time.sleep(interval) @staticmethod def make_task(**kwargs): """生成Task对象""" return Task(**kwargs) def seed_orgs(self) -> List[Mapping]: """组织|单位""" search_orgs = [] cursor = MGO_ORGS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_orgs.append(item) return search_orgs def seed_keywords(self): """关键词""" search_keywords = [] cursor = MGO_KEYWORDS.find(projection=self.projection) for item in cursor.sort(self.sort): search_keywords.append(item['name']) return search_keywords def seed_urls(self) -> List[Mapping]: """种子urls""" search_urls = [] cursor = MGO_URLS.find(self.query, projection=self.projection) for item in cursor.sort(self.sort): search_urls.append(item) return search_urls def lua_common_domains(self): """从lua采集爬虫配置表获取网站名称与对应域名,同步到去重库""" parm_commons = [] projection = {'param_common': 1} cursor = MGO_LUA_SPIDERS.find(projection=projection) for item in cursor.sort(self.sort): name = item['param_common'][1] try: url = item['param_common'][11] if not is_url(url): continue domain = extract_domain(url) except IndexError: continue exists_domain = self.validator.url(domain) if not exists_domain: parm_commons.append({'name': name, 'domain': domain}) self.validator.add_url(domain) return parm_commons class SyncData(BasicSearch): def __init__(self, allow_load_filter=False, **kwargs): super(SyncData, self).__init__(**kwargs) self._init(allow_load_filter) def _init(self, allow_load_filter=False): threading.Thread( target=self.sync_data, name='SyncData' ).start() if allow_load_filter: self.validator.load_filter() def sync_data_urls(self): """同步网址数据""" # _interval = 7200 # while True: items = self.seed_urls() lst = [] for item in items: if not is_url(item['name']): items.remove(item) continue lst.append(self.make_task( url=item['name'], groups=self.url_groups, classify=self.visit_classify, weight=self.url_weight)) self.scheduler.insert_tasks(lst, level=self.url_weight) for item in items: MGO_URLS.update_many( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[同步数据]更新{len(items)}条网址数据') # self.loops_interval(self.sync_data_urls.__name__, _interval) def sync_data_keywords(self): """同步关键词数据""" # _interval = 1800 # while True: words = self.seed_keywords() # 处理关键词格式并推送到任务队列 words = [str(word).replace(' ', '').strip() for word in words] lst = [] for word in words: lst.append(self.make_task( search=word, groups=self.keyword_groups, classify=self.query_classify, weight=self.keyword_weight)) self.scheduler.insert_tasks(lst, level=self.keyword_weight) logger.info(f'[同步数据]更新{len(words)}条关键词数据') # self.loops_interval(self.sync_data_keywords.__name__, _interval) def sync_data_orgs(self): """同步组织单位数据""" # _interval = 3600 # while True: items = self.seed_orgs() # 处理单位组织名称并推送到任务队列 orgs = [] for item in items: name = item.get('name') if name in ['', None]: logger.warning(f'[异常的单位组织]{item}') continue word = str(name).replace(' ', '').strip() orgs.append(word) lst = [] for word in orgs: lst.append(self.make_task( search=word, groups=self.org_groups, classify=self.query_classify, weight=self.org_weight)) self.scheduler.insert_tasks(lst, level=self.org_weight) # 已添加的组织单位名称进行标记,之后不在推送到任务队列 for item in items: MGO_ORGS.update_one( {'_id': item['_id']}, {'$set': {'enable_added': True}} ) logger.info(f'[同步数据]更新{len(items)}个单位组织数据') # self.loops_interval(self.sync_data_orgs.__name__, _interval) def sync_lua_commons(self): """同步lua采集爬虫中网址与网址名称""" _interval = 3600 # while True: items = self.lua_common_domains() for item in items: item['create_at'] = int2long(int(time.time())) MGO_REMOVAL_DUPLICATE.insert_one(item) logger.info(f'[同步数据]更新{len(items)}个网站域名数据') # self.loops_interval(self.sync_lua_commons.__name__, _interval) def sync_data(self): """同步数据""" # threading.Thread( # target=self.sync_data_urls, # name='LoadingSeedUrls' # ).start() # threading.Thread( # target=self.sync_data_keywords, # name='LoadingSeedKeyWords' # ).start() # threading.Thread( # target=self.sync_data_orgs, # name='LoadingSeedOrgs' # ).start() # threading.Thread( # target=self.sync_lua_commons, # name='LoadingLuaCommons' # ).start() logger.info(f'[同步数据]初始化加载') _interval = 600 while True: self.sync_lua_commons() self.sync_data_orgs() self.sync_data_urls() self.sync_data_keywords() self.loops_interval(self.sync_data.__name__, _interval) class SearchEngine(BasicSearch): def __init__(self, wait_task_interval=20, **kwargs): super(SearchEngine, self).__init__(**kwargs) self._wait_task_interval = wait_task_interval self._engines = [] def set_search_engine(self, engine=None): if isinstance(engine, JySearchEngine): self._engines.append(engine) logger.info(f'添加搜索引擎<{engine.__class__.__name__}>完成') return self def set_search_engines(self, engines): for engine in engines: self.set_search_engine(engine) return self def enable_engine(self, engine): fname = self.enable_engine.__name__ ename = engine.__class__.__name__ logger.info(f'[搜索引擎]成功启动 - <{ename}>') while True: tasks = self.scheduler.get_task() if len(tasks) == 0: self.loops_interval(fname, self._wait_task_interval) task_key, task = tasks task['update_at'] = int2long(int(time.time())) if task['classify'] == self.visit_classify: self.scheduler.insert_task(task, level=task['weight']) else: word = task['search'] logger.info(f"<{ename}> {task['groups']} >>> {word}") urls = engine.search(word) lst = [] for url in urls: lst.append(self.make_task( url=url, groups=task['groups'], classify=self.visit_classify, weight=self.url_weight )) self.scheduler.insert_tasks(lst, level=self.url_weight) def load_engines(self): logger.info(f'[搜索引擎]初始化加载') max_workers = len(self._engines) # 根据搜索引擎最大数量设置最大线程池 with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix='SearchEngine') as executor: futures = [] for engine in self._engines: f = executor.submit(self.enable_engine, engine) f.add_done_callback(err_details) futures.append(f) wait(futures) class VisitDomain(BasicSearch): def __init__(self, **kwargs): super(VisitDomain, self).__init__(**kwargs) def push_new_domain(self, task: Task): # 新源 insert_one(MGO_DOMAIN, task) # 加入过滤器 self.validator.add_url(task['domain']) # 加入去重库 item = dict( domain=task['domain'], create_at=task['update_at'] ) insert_one(MGO_REMOVAL_DUPLICATE, item) logger.info(f"[录入新域]{task['domain']} - {task['name']}") def verify(self, task: Task): valid_words = self.validator.words(task['name'], task) if all([valid_words]): self.push_new_domain(task) else: if any([task['sensitive'], task['duplication']]): raise ValidatorError(f"特征检验未通过:{task['name']}") def search_domains(self): while True: _redis_key = REDIS_KEY + '-' + str(self.url_weight) tasks = self.scheduler.get_task(_redis_key) if len(tasks) == 0: logger.info('关闭寻源爬虫') break task_key, task = tasks task['update_at'] = int2long(int(time.time())) if task['classify'] != self.visit_classify: self.scheduler.insert_task(task, level=task['weight']) else: domain = extract_domain(task['url']) exists_domain = self.validator.url(domain) if exists_domain: continue logger.info(f'request web site -> {task["url"]}') response = self.downloader.get(task['url']) if response.status_code != 200 or response.text in ['', None]: continue response.encoding = response.apparent_encoding if response.encoding in SPECIAL_ENCODINGS: response.encoding = 'utf-8' task['domain'] = domain base_url = extract_base_url(task['url']) task['base_url'] = base_url page_source = response.text title = extract_page_title(page_source) task['name'] = title try: self.verify(task) urls = self.parser(page_source, base_url) new_tasks = [] for url in urls: new_tasks.append(self.make_task( url=url, groups=task['groups'], classify=self.visit_classify, weight=task['weight'] )) self.scheduler.insert_tasks(new_tasks, level=self.url_weight) except HostsRetrieveError: pass