import threading from concurrent.futures import ThreadPoolExecutor, wait from common.log import logger from crawler.Task import Task from crawler.services.basics import BasicSearch from crawler.utils import ( extract_base_url, extract_page_title, extract_domain, err_details, ) class DataExcavate(BasicSearch): def __init__(self, workers=1, loop_interval=60, **kwargs): super(DataExcavate, self).__init__(**kwargs) self._interval = loop_interval self._workers = workers def retrieve_site(self, task: Task): t_name = threading.currentThread().getName() logger.info(f'[{t_name}]开始请求 - {task["url"]}') response = self.downloader.get(task['url']) task['status_code'] = response.status_code if response.status_code != 200 or response.text in ['', None]: task['err_reason'] = response.reason logger.error(f'[{t_name}]异常网址 - {task["url"]} - {response.reason}') return False task['domain'] = extract_domain(task['url']) page_source = response.text task['name'] = extract_page_title(page_source) task['base_url'] = extract_base_url(task['url']) items = self.parser.site_items(page_source, task['base_url']) lst = [] _c = 0 # 过滤词计数器 for item in items: name, url = item['name'], item['host'] if self.validator.words(name): lst.append(self.make_task( url=url, name=name, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=task['weight'] )) _c += 1 if _c > 1: self.push_domain(task) else: self.push_remove(task) self.scheduler.add_excavate(lst, level=task['weight']) return True def excavate(self): t_name = threading.currentThread().getName() logger.info(f'[{t_name}]数据挖掘 - 启动') while True: tasks = self.scheduler.get_excavate_task() if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks if self.validator.data(task['url']): continue '''挖掘站点''' success = self.retrieve_site(task) if not success: '''url - 添加过滤器''' self.validator.add_data(task['url']) # '''挖掘记录''' # self.push_records(task) def start(self): logger.info(f'[数据挖掘]初始化加载') with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self.excavate) f.add_done_callback(err_details) futures.append(f) wait(futures)