import threading from concurrent.futures import ThreadPoolExecutor, wait from typing import List from common.log import logger from crawler.Task import Task from crawler.services.basics import BasicSearch from crawler.utils import ( extract_base_url, extract_page_title, extract_domain, split_domain, err_details, ) TLDS = ['com', 'cn', 'net', 'org'] class DataExcavate(BasicSearch): def __init__(self, workers=1, loop_interval=60, **kwargs): super(DataExcavate, self).__init__(**kwargs) self._interval = loop_interval self._workers = workers self._max_depth = (kwargs.pop('excavate_depth', None) or 3) self._default_depth = 1 def _init_depth(self, task: Task): if task.get('depth') is None: task.update({'depth': self._default_depth}) def check_depth(self, task: Task, sub_tasks: List[Task]): _results = [] curr_depth = task['depth'] if curr_depth < self._max_depth: for t in sub_tasks: t.setdefault('depth', curr_depth + 1) _results.append(t) return _results def is_rubbish(self, url: str): """ 网址过滤器 :param url: 网址 :return: bool """ if self.validator.data(url): return True domain = extract_domain(url) if self.validator.data(domain): return True if domain.startswith('www.'): domain = domain.replace('www.', '') '''域名处理''' domain_lst = split_domain(domain) domain_lst = [d for d in domain_lst if d not in TLDS] text = ".".join(domain_lst) if self.validator.data(text): return True '''检查域名中所包含的字符串''' for val in domain_lst: if self.validator.data(val): return True return False def process(self, task: Task): t_name = threading.currentThread().getName() logger.info(f'<{t_name}> - 请求 - {task["url"]}') response = self.downloader.get(task['url']) status_code = response.status_code page_source = response.text reason = response.reason task['status_code'] = status_code if status_code != 200 or page_source in ['', None]: task['err_reason'] = reason logger.error(f'<{t_name}> - {reason} - {status_code} - {task["url"]}') return False task['domain'] = extract_domain(task['url']) task['name'] = extract_page_title(page_source) task['base_url'] = extract_base_url(task['url']) items = self.parser.site_items(page_source, task['base_url']) lst = [] _c = 0 # 过滤词计数器 for item in items: name, url = item['name'], item['host'] if self.validator.words(name): lst.append(self.make_task( url=url, name=name, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=task['weight'] )) _c += 1 if _c > 1: if self.push_domain(task): lst = self.check_depth(task, lst) else: self.push_remove(task) self.scheduler.add_excavate(lst, level=task['weight']) return True def excavate(self): t_name = threading.currentThread().getName() logger.info(f'开启线程 - <{t_name}>') while True: tasks = self.scheduler.get_excavate_task() if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks '''初始化网站层级''' self._init_depth(task) if self.is_rubbish(task['url']): logger.debug(f'<{t_name}> - 去重网址 - {task["url"]}') continue '''数据挖掘''' try: success = self.process(task) if not success: '''url - 添加过滤器''' self.validator.add_data(task['url']) except Exception as e: logger.exception(e) # '''挖掘记录''' # self.push_records(task) def start(self): with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self.excavate) f.add_done_callback(err_details) futures.append(f) wait(futures)