import re from concurrent.futures import ThreadPoolExecutor, wait from common.log import logger from crawler.Task import Task from crawler.analysis import TimeExtractor from crawler.services.basics import BasicService from crawler.utils import ( extract_host, extract_page_title, extract_domain, err_details, is_url, html2element, iter_node, check_page_by_words ) TLDS = ['com', 'cn', 'net', 'org'] URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip'] URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$' class DataExcavate(BasicService): """数据挖掘服务""" def __init__(self, **kwargs): self._workers = (kwargs.pop('workers', None) or 1) self._max_depth = (kwargs.pop('excavate_depth', None) or 1) self._interval = (kwargs.pop('excavate_interval', None) or 60) super(DataExcavate, self).__init__(**kwargs) self._default_depth = 1 def _init_depth(self, task: Task): if task.get('depth') is None: task.update({'depth': self._default_depth}) def interceptor(self, task: Task): """ 拦截器:拦截需要丢弃的任务 :param task: 请求任务 :return: """ url = task['url'] if not is_url(url): logger.debug(f'<{self.thread_name}> - 垃圾网址 - {url}') return True curr_depth = task['depth'] domain = extract_domain(url) is_attachment_url = re.match(URL_SUFFIX_PATTERN, url) is not None if any([ curr_depth > self._max_depth, # 检查任务层级 is_attachment_url, # 检查网址是否附件下载地址 self.validator.data(url), # 垃圾池 - 判重任务请求网址 self.validator.data(domain), # 垃圾池 - 过滤域名 self.collector.data(domain), # 收录池 - 判重域名 ]): logger.debug(f'<{self.thread_name}> - 无效任务 - {curr_depth} - {url}') return True return False def filter_data(self, lst): """通过垃圾过滤器过滤数据""" results = [] for val in lst: if not self.validator.data(val): results.append(val) return results def same_origin_strategy(self, source, task: Task): """同源策略""" # 排查时间文本 hit_date_total = 0 date_nodes = [] element = html2element(source) for node, _ in iter_node(element): pt = TimeExtractor().extractor(node) if pt and not node.getchildren() and node not in date_nodes: date_nodes.append(node) hit_date_total += 1 # 全文排查检索词 hit_text_total = 0 all_text = ["".join(text.split()) for text in element.itertext()] all_text = [text for text in all_text if len(text) > 0] for text in all_text: # print(text) if check_page_by_words(text): hit_text_total += 1 # 寻源结果 if all([3 < hit_date_total < 50, hit_text_total > 3]): self.push_domain(task) elif hit_text_total > 5: self.push_domain(task) else: self.push_remove(task) # 抽取当前页面同源链接(使用同源模式:mode=1),不遗漏挖掘层级,同时权重优先级+1,优先处理 lst = [] history = [] sub_depth = task['depth'] + 1 sub_weight = task['weight'] + 1 items = self.parser.turls(source, task['url'], mode=1) for item in items: name, url = item['title'], item['href'] if not self.validator.data(url) and url not in history: lst.append(self.make_task( url=url, name=name, depth=sub_depth, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=sub_weight )) history.append(url) self.scheduler.add_excavate(lst, level=sub_weight) def non_origin_strategy(self, source, task: Task): """非同源策略""" lst = [] history = [] # 抽取当前页面非同源链接(使用非同源模式:mode=2),添加挖掘队列,一个站点挖掘结束,开始处理新站点 non_origin_urls = self.parser.urls(source, task['url'], mode=2) for url in non_origin_urls: if not self.validator.data(url) and url not in history: lst.append(self.make_task( url=url, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=self.url_weight )) history.append(url) self.scheduler.add_excavate(lst, level=self.url_weight) def fetch_page(self, task: Task): url = task['url'] response = self.downloader.get(url, timeout=5) status_code = response.status_code page_source = response.text reason = response.reason if status_code != 200 or page_source in ['', None]: task['err_reason'] = reason msg = f'<{self.thread_name}> - {url} - {status_code} - {reason}' logger.error(msg) return status_code, None return status_code, page_source def process(self, task: Task): logger.info(f'<{self.thread_name}> - 请求 - {task["url"]}') status_code, page_source = self.fetch_page(task) task['status_code'] = status_code if page_source is None: # 访问失败的域名是否添加过滤器? self.push_remove(task) return False task['domain'] = extract_domain(task['url']) task['base_url'] = extract_host(task['url']) task['name'] = extract_page_title(page_source) self.same_origin_strategy(page_source, task) self.non_origin_strategy(page_source, task) return True def excavate(self): logger.info(f'开启线程 - <{self.thread_name}>') while True: tasks = self.scheduler.get_excavate_task() if len(tasks) == 0: self.loops_interval(self._interval, enable_debug_log=True) continue task_key, task = tasks self._init_depth(task) # 初始化网站层级 if not self.interceptor(task): try: self.process(task) except Exception as e: logger.exception(e) # '''挖掘记录''' # self.push_records(task) def start(self): with ThreadPoolExecutor(self._workers, 'dataExcavate') as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self.excavate) f.add_done_callback(err_details) futures.append(f) wait(futures)