import re import threading from concurrent.futures import ThreadPoolExecutor, wait from typing import List from common.log import logger from crawler.Task import Task from crawler.services.basics import BasicService from crawler.utils import ( extract_host, extract_page_title, extract_domain, split_domain, err_details, is_url, ) TLDS = ['com', 'cn', 'net', 'org'] URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip'] URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$' class DataExcavate(BasicService): """数据挖掘服务""" def __init__(self, **kwargs): self._workers = (kwargs.pop('workers', None) or 1) self._max_depth = (kwargs.pop('excavate_depth', None) or 1) self._interval = (kwargs.pop('excavate_interval', None) or 60) super(DataExcavate, self).__init__(**kwargs) self._default_depth = 1 def _init_depth(self, task: Task): if task.get('depth') is None: task.update({'depth': self._default_depth}) def check_depth(self, task: Task, sub_tasks: List[Task]): _results = [] curr_depth = task['depth'] if curr_depth < self._max_depth: for t in sub_tasks: t.setdefault('depth', curr_depth + 1) _results.append(t) return _results def is_rubbish(self, url: str): """ 网址过滤器 :param url: 网址 :return: False = 不存在, True = 存在 """ if not is_url(url): return True if self.validator.data(url): return True domain = extract_domain(url) if self.validator.data(domain): return True if domain.startswith('www.'): domain = domain.replace('www.', '') '''域名处理''' domain_lst = split_domain(domain) domain_lst = [d for d in domain_lst if d not in TLDS] text = ".".join(domain_lst) if self.validator.data(text): return True '''检查域名中所包含的字符串''' for val in domain_lst: if self.validator.data(val): return True return False def process(self, t_name: str, task: Task): logger.info(f'<{t_name}> - 请求 - {task["url"]}') response = self.downloader.get(task['url'], timeout=5) status_code = response.status_code page_source = response.text reason = response.reason task['status_code'] = status_code if status_code != 200 or page_source in ['', None]: task['err_reason'] = reason logger.error(f'<{t_name}> - {reason} - {status_code} - {task["url"]}') return False task['domain'] = extract_domain(task['url']) task['name'] = extract_page_title(page_source) task['base_url'] = extract_host(task['url']) lst = [] _history = [] _c = 0 # 过滤词计数器 sub_depth = task['depth'] + 1 sub_weight = task['weight'] + 1 items = self.parser.non_origin(page_source, task['url']) for item in items: name, url = item['title'], item['href'] if self.validator.words(name): if url not in _history: lst.append(self.make_task( url=url, name=name, depth=sub_depth, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=sub_weight )) _history.append(url) _c += 1 if _c > 1: save = self.push_domain(task) msg = f'<{t_name}> - 收录成功 - {task["url"]}' if not save: msg = f'<{t_name}> - 重复收录 - {task["url"]}' else: remove = self.push_remove(task) msg = f'<{t_name}> - 过滤丢弃 - {task["url"]}' if not remove: msg = f'<{t_name}> - 重复收录 - {task["url"]}' logger.debug(msg) '''层级深的,优先采集''' self.scheduler.add_excavate(lst, level=sub_weight) return True def excavate(self): t_name = threading.currentThread().getName() logger.info(f'开启线程 - <{t_name}>') while True: tasks = self.scheduler.get_excavate_task() if len(tasks) == 0: self.loops_interval(self._interval, enable_debug_log=True) continue task_key, task = tasks # 初始化网站层级 self._init_depth(task) if self.is_rubbish(task['url']): logger.debug(f'<{t_name}> - 垃圾数据 - {task["url"]}') continue # 层级控制 if task['depth'] > self._max_depth: logger.debug(f'<{t_name}> - 层级超限 - {task["url"]}') # self.push_records(task) continue dont_visit = re.match(URL_SUFFIX_PATTERN, task['url']) is not None if not dont_visit: try: success = self.process(t_name, task) if not success: self.validator.add_data(task['url']) except Exception as e: logger.exception(e) # '''挖掘记录''' # self.push_records(task) def start(self): with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self.excavate) f.add_done_callback(err_details) futures.append(f) wait(futures)