123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- import threading
- from concurrent.futures import ThreadPoolExecutor, wait
- from typing import List
- from common.log import logger
- from crawler.Task import Task
- from crawler.services.basics import BasicSearch
- from crawler.utils import (
- extract_base_url,
- extract_page_title,
- extract_domain,
- split_domain,
- err_details,
- )
- TLDS = ['com', 'cn', 'net', 'org']
- class DataExcavate(BasicSearch):
- def __init__(self, workers=1, loop_interval=60, **kwargs):
- super(DataExcavate, self).__init__(**kwargs)
- self._interval = loop_interval
- self._workers = workers
- self._max_depth = (kwargs.pop('excavate_depth', None) or 3)
- self._default_depth = 1
- def _init_depth(self, task: Task):
- if task.get('depth') is None:
- task.update({'depth': self._default_depth})
- def check_depth(self, task: Task, sub_tasks: List[Task]):
- _results = []
- curr_depth = task['depth']
- if curr_depth < self._max_depth:
- for t in sub_tasks:
- t.setdefault('depth', curr_depth + 1)
- _results.append(t)
- return _results
- def is_rubbish(self, url: str):
- """
- 网址过滤器
- :param url: 网址
- :return: bool
- """
- if self.validator.data(url):
- return True
- domain = extract_domain(url)
- if self.validator.data(domain):
- return True
- if domain.startswith('www.'):
- domain = domain.replace('www.', '')
- '''域名处理'''
- domain_lst = split_domain(domain)
- domain_lst = [d for d in domain_lst if d not in TLDS]
- text = ".".join(domain_lst)
- if self.validator.data(text):
- return True
- '''检查域名中所包含的字符串'''
- for val in domain_lst:
- if self.validator.data(val):
- return True
- return False
- def process(self, task: Task):
- t_name = threading.currentThread().getName()
- logger.info(f'<{t_name}> - 请求 - {task["url"]}')
- response = self.downloader.get(task['url'])
- status_code = response.status_code
- page_source = response.text
- reason = response.reason
- task['status_code'] = status_code
- if status_code != 200 or page_source in ['', None]:
- task['err_reason'] = reason
- logger.error(f'<{t_name}> - {reason} - {status_code} - {task["url"]}')
- return False
- task['domain'] = extract_domain(task['url'])
- task['name'] = extract_page_title(page_source)
- task['base_url'] = extract_base_url(task['url'])
- items = self.parser.site_items(page_source, task['base_url'])
- lst = []
- _c = 0 # 过滤词计数器
- for item in items:
- name, url = item['name'], item['host']
- if self.validator.words(name):
- lst.append(self.make_task(
- url=url,
- name=name,
- origin=task['origin'],
- groups=task['groups'],
- classify=self.visit_classify,
- weight=task['weight']
- ))
- _c += 1
- if _c > 1:
- if self.push_domain(task):
- lst = self.check_depth(task, lst)
- else:
- self.push_remove(task)
- self.scheduler.add_excavate(lst, level=task['weight'])
- return True
- def excavate(self):
- t_name = threading.currentThread().getName()
- logger.info(f'开启线程 - <{t_name}>')
- while True:
- tasks = self.scheduler.get_excavate_task()
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- '''初始化网站层级'''
- self._init_depth(task)
- if self.is_rubbish(task['url']):
- logger.debug(f'<{t_name}> - 去重网址 - {task["url"]}')
- continue
- '''数据挖掘'''
- try:
- success = self.process(task)
- if not success:
- '''url - 添加过滤器'''
- self.validator.add_data(task['url'])
- except Exception as e:
- logger.exception(e)
- # '''挖掘记录'''
- # self.push_records(task)
- def start(self):
- with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
- futures = []
- for _ in range(1, self._workers + 1):
- f = executor.submit(self.excavate)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
|