import re import time from concurrent.futures import ThreadPoolExecutor, wait from common.databases import int2long from common.log import logger from crawler.Task import Task from crawler.analysis import TimeExtractor from crawler.services.basics import BasicService from crawler.utils import ( extract_host, extract_page_title, extract_domain, err_details, is_url, html2element, iter_node, check_page_by_words, predict_bidding_model, is_contains ) from settings import Dzr TLDS = ['com', 'cn', 'net', 'org'] URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip'] URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$' def predict_data(html, task: Task): data_json = {'html': html, 'predict': 0} if html is None: return data_json data = {'contenthtml': html} data_json = predict_bidding_model(data) Dzr.insert_one({ 'site': task['origin'], # 'html': compress_str(html), 'url': task['url'], 'predict': data_json['predict'], 'comeintime': int2long(int(time.time())) }) return data_json class DataExcavate(BasicService): """数据挖掘服务""" def __init__(self, **kwargs): self._workers = (kwargs.pop('workers', None) or 1) self._max_depth = (kwargs.pop('excavate_depth', None) or 1) self._interval = (kwargs.pop('excavate_interval', None) or 60) super(DataExcavate, self).__init__(**kwargs) self._default_depth = 1 def _init_depth(self, task: Task): if task.get('depth') is None: task.update({'depth': self._default_depth}) def interceptor(self, task: Task): """ 拦截器:拦截需要丢弃的任务 :param task: 请求任务 :return: """ url = task['url'] if not is_url(url): logger.debug(f'<{self.thread_name}> - 垃圾网址 - {url}') return True curr_depth = task['depth'] domain = extract_domain(url) is_attachment_url = re.match(URL_SUFFIX_PATTERN, url) is not None is_login_url = is_contains(url, 'login') if any([ curr_depth > self._max_depth, # 检查任务层级 is_attachment_url, # 判断是否附件下载地址 is_login_url, # 判断是否登录地址 self.validator.data(url), # 垃圾池 - 判重任务请求网址 self.validator.data(domain), # 垃圾池 - 过滤域名(一级) self.collector.data(domain), # 收录池 - 判重域名(一级) ]): logger.debug(f'<{self.thread_name}> - 无效任务 - {curr_depth} - {url}') return True return False def filter_data(self, lst): """通过垃圾过滤器过滤数据""" results = [] for val in lst: if not self.validator.data(val): results.append(val) return results def same_origin_strategy(self, source, task: Task): """同源策略""" # 排查时间文本 hit_date_total = 0 date_nodes = [] element = html2element(source) for node, _ in iter_node(element): pt = TimeExtractor().extractor(node) if pt and not node.getchildren() and node not in date_nodes: date_nodes.append(node) hit_date_total += 1 # 全文排查检索词 hit_text_total = 0 all_text = ["".join(text.split()) for text in element.itertext()] all_text = [text for text in all_text if len(text) > 0] for text in all_text: # print(text) if check_page_by_words(text): hit_text_total += 1 # 寻源结果 if all([3 < hit_date_total < 50, hit_text_total > 3]): self.push_domain(task) elif hit_text_total > 5: self.push_domain(task) else: self.push_remove(task) # 抽取当前页面同源链接(使用同源模式:mode=1),不遗漏挖掘层级,同时权重优先级+1,优先处理 lst = [] history = [] sub_depth = task['depth'] + 1 sub_weight = task['weight'] + 1 items = self.parser.turls(source, task['url'], mode=1) for item in items: name, url = item['title'], item['href'] if not self.validator.data(url) and url not in history: lst.append(self.make_task( url=url, name=name, depth=sub_depth, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=sub_weight )) history.append(url) self.scheduler.add_excavate(lst, level=sub_weight) def non_origin_strategy(self, source, task: Task): """非同源策略""" lst = [] history = [] # 抽取当前页面非同源链接(使用非同源模式:mode=2),添加挖掘队列,一个站点挖掘结束,开始处理新站点 non_origin_urls = self.parser.urls(source, task['url'], mode=2) for url in non_origin_urls: if not self.validator.data(url) and url not in history: lst.append(self.make_task( url=url, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=self.url_weight )) history.append(url) self.scheduler.add_excavate(lst, level=self.url_weight) def fetch_page(self, task: Task): url = task['url'] response = self.downloader.get(url, timeout=5) status_code = response.status_code page_source = response.text reason = response.reason if status_code != 200 or page_source in ['', None]: task['err_reason'] = reason msg = f'<{self.thread_name}> - {url} - {status_code} - {reason}' logger.error(msg) return status_code, None return status_code, page_source def process(self, task: Task): logger.info(f'<{self.thread_name}> - 请求 - {task["url"]}') status_code, page_source = self.fetch_page(task) task['status_code'] = status_code predict_data(page_source, task) if page_source is None: # 访问失败的域名是否添加过滤器? self.push_remove(task) return False task['domain'] = extract_domain(task['url']) task['base_url'] = extract_host(task['url']) task['name'] = extract_page_title(page_source) self.same_origin_strategy(page_source, task) self.non_origin_strategy(page_source, task) return True def excavate(self): logger.info(f'开启线程 - <{self.thread_name}>') while True: tasks = self.scheduler.get_excavate_task() if len(tasks) == 0: self.loops_interval(self._interval, enable_debug_log=True) continue task_key, task = tasks self._init_depth(task) # 初始化网站层级 if not self.interceptor(task): try: self.process(task) except Exception as e: logger.exception(e) # '''挖掘记录''' # self.push_records(task) def start(self): with ThreadPoolExecutor(self._workers, 'dataExcavate') as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self.excavate) f.add_done_callback(err_details) futures.append(f) wait(futures)