123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217 |
- import re
- import time
- from concurrent.futures import ThreadPoolExecutor, wait
- from common.databases import int2long
- from common.log import logger
- from crawler.Task import Task
- from crawler.analysis import TimeExtractor
- from crawler.services.basics import BasicService
- from crawler.utils import (
- extract_host,
- extract_page_title,
- extract_domain,
- err_details,
- is_url,
- html2element,
- iter_node,
- check_page_by_words,
- predict_bidding_model, is_contains
- )
- from settings import Dzr
- TLDS = ['com', 'cn', 'net', 'org']
- URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip']
- URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$'
- def predict_data(html, task: Task):
- data_json = {'html': html, 'predict': 0}
- if html is None:
- return data_json
- data = {'contenthtml': html}
- data_json = predict_bidding_model(data)
- Dzr.insert_one({
- 'site': task['origin'],
- # 'html': compress_str(html),
- 'url': task['url'],
- 'predict': data_json['predict'],
- 'comeintime': int2long(int(time.time()))
- })
- return data_json
- class DataExcavate(BasicService):
- """数据挖掘服务"""
- def __init__(self, **kwargs):
- self._workers = (kwargs.pop('workers', None) or 1)
- self._max_depth = (kwargs.pop('excavate_depth', None) or 1)
- self._interval = (kwargs.pop('excavate_interval', None) or 60)
- super(DataExcavate, self).__init__(**kwargs)
- self._default_depth = 1
- def _init_depth(self, task: Task):
- if task.get('depth') is None:
- task.update({'depth': self._default_depth})
- def interceptor(self, task: Task):
- """
- 拦截器:拦截需要丢弃的任务
- :param task: 请求任务
- :return:
- """
- url = task['url']
- if not is_url(url):
- logger.debug(f'<{self.thread_name}> - 垃圾网址 - {url}')
- return True
- curr_depth = task['depth']
- domain = extract_domain(url)
- is_attachment_url = re.match(URL_SUFFIX_PATTERN, url) is not None
- is_login_url = is_contains(url, 'login')
- if any([
- curr_depth > self._max_depth, # 检查任务层级
- is_attachment_url, # 判断是否附件下载地址
- is_login_url, # 判断是否登录地址
- self.validator.data(url), # 垃圾池 - 判重任务请求网址
- self.validator.data(domain), # 垃圾池 - 过滤域名(一级)
- self.collector.data(domain), # 收录池 - 判重域名(一级)
- ]):
- logger.debug(f'<{self.thread_name}> - 无效任务 - {curr_depth} - {url}')
- return True
- return False
- def filter_data(self, lst):
- """通过垃圾过滤器过滤数据"""
- results = []
- for val in lst:
- if not self.validator.data(val):
- results.append(val)
- return results
- def same_origin_strategy(self, source, task: Task):
- """同源策略"""
- # 排查时间文本
- hit_date_total = 0
- date_nodes = []
- element = html2element(source)
- for node, _ in iter_node(element):
- pt = TimeExtractor().extractor(node)
- if pt and not node.getchildren() and node not in date_nodes:
- date_nodes.append(node)
- hit_date_total += 1
- # 全文排查检索词
- hit_text_total = 0
- all_text = ["".join(text.split()) for text in element.itertext()]
- all_text = [text for text in all_text if len(text) > 0]
- for text in all_text:
- # print(text)
- if check_page_by_words(text):
- hit_text_total += 1
- # 寻源结果
- if all([3 < hit_date_total < 50, hit_text_total > 3]):
- self.push_domain(task)
- elif hit_text_total > 5:
- self.push_domain(task)
- else:
- self.push_remove(task)
- # 抽取当前页面同源链接(使用同源模式:mode=1),不遗漏挖掘层级,同时权重优先级+1,优先处理
- lst = []
- history = []
- sub_depth = task['depth'] + 1
- sub_weight = task['weight'] + 1
- items = self.parser.turls(source, task['url'], mode=1)
- for item in items:
- name, url = item['title'], item['href']
- if not self.validator.data(url) and url not in history:
- lst.append(self.make_task(
- url=url,
- name=name,
- depth=sub_depth,
- origin=task['origin'],
- groups=task['groups'],
- classify=self.visit_classify,
- weight=sub_weight
- ))
- history.append(url)
- self.scheduler.add_excavate(lst, level=sub_weight)
- def non_origin_strategy(self, source, task: Task):
- """非同源策略"""
- lst = []
- history = []
- # 抽取当前页面非同源链接(使用非同源模式:mode=2),添加挖掘队列,一个站点挖掘结束,开始处理新站点
- non_origin_urls = self.parser.urls(source, task['url'], mode=2)
- for url in non_origin_urls:
- if not self.validator.data(url) and url not in history:
- lst.append(self.make_task(
- url=url,
- origin=task['origin'],
- groups=task['groups'],
- classify=self.visit_classify,
- weight=self.url_weight
- ))
- history.append(url)
- self.scheduler.add_excavate(lst, level=self.url_weight)
- def fetch_page(self, task: Task):
- url = task['url']
- response = self.downloader.get(url, timeout=5)
- status_code = response.status_code
- page_source = response.text
- reason = response.reason
- if status_code != 200 or page_source in ['', None]:
- task['err_reason'] = reason
- msg = f'<{self.thread_name}> - {url} - {status_code} - {reason}'
- logger.error(msg)
- return status_code, None
- return status_code, page_source
- def process(self, task: Task):
- logger.info(f'<{self.thread_name}> - 请求 - {task["url"]}')
- status_code, page_source = self.fetch_page(task)
- task['status_code'] = status_code
- predict_data(page_source, task)
- if page_source is None:
- # 访问失败的域名是否添加过滤器?
- self.push_remove(task)
- return False
- task['domain'] = extract_domain(task['url'])
- task['base_url'] = extract_host(task['url'])
- task['name'] = extract_page_title(page_source)
- self.same_origin_strategy(page_source, task)
- self.non_origin_strategy(page_source, task)
- return True
- def excavate(self):
- logger.info(f'开启线程 - <{self.thread_name}>')
- while True:
- tasks = self.scheduler.get_excavate_task()
- if len(tasks) == 0:
- self.loops_interval(self._interval, enable_debug_log=True)
- continue
- task_key, task = tasks
- self._init_depth(task) # 初始化网站层级
- if not self.interceptor(task):
- try:
- self.process(task)
- except Exception as e:
- logger.exception(e)
- # '''挖掘记录'''
- # self.push_records(task)
- def start(self):
- with ThreadPoolExecutor(self._workers, 'dataExcavate') as executor:
- futures = []
- for _ in range(1, self._workers + 1):
- f = executor.submit(self.excavate)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
|