123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167 |
- import re
- import threading
- from concurrent.futures import ThreadPoolExecutor, wait
- from typing import List
- from common.log import logger
- from crawler.Task import Task
- from crawler.services.basics import BasicService
- from crawler.utils import (
- extract_base_url,
- extract_page_title,
- extract_domain,
- split_domain,
- err_details,
- is_url,
- )
- TLDS = ['com', 'cn', 'net', 'org']
- URL_SUFFIX = ['pdf', 'xls', 'xlsx', 'docx', 'doc', 'rar', 'zip']
- URL_SUFFIX_PATTERN = '.*(' + '|'.join(URL_SUFFIX) + ')$'
- class DataExcavate(BasicService):
- """数据挖掘服务"""
- def __init__(self, **kwargs):
- self._workers = (kwargs.pop('workers', None) or 1)
- self._max_depth = (kwargs.pop('excavate_depth', None) or 1)
- self._interval = (kwargs.pop('excavate_interval', None) or 60)
- super(DataExcavate, self).__init__(**kwargs)
- self._default_depth = 1
- def _init_depth(self, task: Task):
- if task.get('depth') is None:
- task.update({'depth': self._default_depth})
- def check_depth(self, task: Task, sub_tasks: List[Task]):
- _results = []
- curr_depth = task['depth']
- if curr_depth < self._max_depth:
- for t in sub_tasks:
- t.setdefault('depth', curr_depth + 1)
- _results.append(t)
- return _results
- def is_rubbish(self, url: str):
- """
- 网址过滤器
- :param url: 网址
- :return: False = 不存在, True = 存在
- """
- if not is_url(url):
- return True
- if self.validator.data(url):
- return True
- domain = extract_domain(url)
- if self.validator.data(domain):
- return True
- if domain.startswith('www.'):
- domain = domain.replace('www.', '')
- '''域名处理'''
- domain_lst = split_domain(domain)
- domain_lst = [d for d in domain_lst if d not in TLDS]
- text = ".".join(domain_lst)
- if self.validator.data(text):
- return True
- '''检查域名中所包含的字符串'''
- for val in domain_lst:
- if self.validator.data(val):
- return True
- return False
- def process(self, t_name: str, task: Task):
- logger.info(f'<{t_name}> - 请求 - {task["url"]}')
- response = self.downloader.get(task['url'], disable_debug_log=False)
- status_code = response.status_code
- page_source = response.text
- reason = response.reason
- task['status_code'] = status_code
- if status_code != 200 or page_source in ['', None]:
- task['err_reason'] = reason
- logger.error(f'<{t_name}> - {reason} - {status_code} - {task["url"]}')
- return False
- task['domain'] = extract_domain(task['url'])
- task['name'] = extract_page_title(page_source)
- task['base_url'] = extract_base_url(task['url'])
- items = self.parser.site_items(page_source, task['base_url'])
- lst = []
- _c = 0 # 过滤词计数器
- sub_depth = task['depth'] + 1
- sub_weight = task['weight'] + 1
- for item in items:
- name, url = item['name'], item['host']
- if self.validator.words(name):
- lst.append(self.make_task(
- url=url,
- name=name,
- depth=sub_depth,
- origin=task['origin'],
- groups=task['groups'],
- classify=self.visit_classify,
- weight=sub_weight
- ))
- _c += 1
- if _c > 1:
- save = self.push_domain(task)
- else:
- save = self.push_remove(task)
- msg = f'<{t_name}> - 新增网址 - {task["url"]}'
- if not save:
- msg = f'<{t_name}> - 重复网址 - {task["url"]}'
- logger.debug(msg)
- '''层级深的,优先采集'''
- self.scheduler.add_excavate(lst, level=sub_weight)
- return True
- def excavate(self):
- t_name = threading.currentThread().getName()
- logger.info(f'开启线程 - <{t_name}>')
- while True:
- tasks = self.scheduler.get_excavate_task()
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- '''初始化网站层级'''
- self._init_depth(task)
- if self.is_rubbish(task['url']):
- logger.debug(f'<{t_name}> - 垃圾数据 - {task["url"]}')
- continue
- '''层级控制'''
- if task['depth'] > self._max_depth:
- logger.debug(f'<{t_name}> - 层级超限 - {task["url"]}')
- self.push_records(task)
- continue
- dont_visit = re.match(URL_SUFFIX_PATTERN, task['url']) is not None
- if not dont_visit:
- try:
- success = self.process(t_name, task)
- if not success:
- self.validator.add_data(task['url'])
- except Exception as e:
- logger.exception(e)
- # '''挖掘记录'''
- # self.push_records(task)
- def start(self):
- with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
- futures = []
- for _ in range(1, self._workers + 1):
- f = executor.submit(self.excavate)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
|