import threading from concurrent.futures import ThreadPoolExecutor, wait from common.log import logger from crawler.Task import Task from crawler.services.basics import BasicSearch from crawler.utils import ( extract_base_url, extract_page_title, extract_domain, err_details, ) class DataExcavate(BasicSearch): def __init__(self, excavate_workers=1, loop_excavate_interval=60, **kwargs): super(DataExcavate, self).__init__(**kwargs) self._interval = loop_excavate_interval self._workers = excavate_workers def retrieve_site(self, task: Task): logger.info(f'[数据挖掘]开始请求 - {task["url"]}') response = self.downloader.get(task['url']) task['status_code'] = response.status_code if response.status_code != 200 or response.text in ['', None]: task['err_reason'] = response.reason logger.error(f'[数据挖掘]异常网址 - {task["url"]}') return task['domain'] = extract_domain(task['url']) page_source = response.text task['name'] = extract_page_title(page_source) task['base_url'] = extract_base_url(task['url']) items = self.parser.site_items(page_source, task['base_url']) lst = [] _c = 0 # 页面包含的关键词计数器 for item in items: name, url = item['name'], item['host'] if self.validator.phrase(name): lst.append(self.make_task( url=url, name=name, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=task['weight'] )) _c += 1 if _c > 1: self.push_domain(task) else: if not self.validator.data(task['domain']): self.push_remove(task) self.scheduler.add_excavate(lst, level=task['weight']) def excavate(self): t_name = threading.currentThread().getName() logger.info(f'[数据挖掘]启动 - {t_name}') while True: tasks = self.scheduler.get_excavate_task() if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks if self.validator.data(task['url']): continue self.retrieve_site(task) '''url - 添加过滤器''' self.validator.add_data(task['url']) '''domain - 添加过滤器''' self.validator.add_data(task['domain']) '''访问记录''' self.push_records(task) def start(self): logger.info(f'[数据挖掘]初始化加载') with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self.excavate) f.add_done_callback(err_details) futures.append(f) wait(futures)