12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- import threading
- from concurrent.futures import ThreadPoolExecutor, wait
- from common.log import logger
- from crawler.Task import Task
- from crawler.services.basics import BasicSearch
- from crawler.utils import (
- extract_base_url,
- extract_page_title,
- extract_domain,
- err_details,
- )
- class DataExcavate(BasicSearch):
- def __init__(self, workers=1, loop_interval=60, **kwargs):
- super(DataExcavate, self).__init__(**kwargs)
- self._interval = loop_interval
- self._workers = workers
- def retrieve_site(self, task: Task):
- t_name = threading.currentThread().getName()
- logger.info(f'[{t_name}]开始请求 - {task["url"]}')
- response = self.downloader.get(task['url'])
- task['status_code'] = response.status_code
- if response.status_code != 200 or response.text in ['', None]:
- task['err_reason'] = response.reason
- logger.error(f'[{t_name}]异常网址 - {task["url"]} - {response.reason}')
- return False
- task['domain'] = extract_domain(task['url'])
- page_source = response.text
- task['name'] = extract_page_title(page_source)
- task['base_url'] = extract_base_url(task['url'])
- items = self.parser.site_items(page_source, task['base_url'])
- lst = []
- _c = 0 # 过滤词计数器
- for item in items:
- name, url = item['name'], item['host']
- if self.validator.words(name):
- lst.append(self.make_task(
- url=url,
- name=name,
- origin=task['origin'],
- groups=task['groups'],
- classify=self.visit_classify,
- weight=task['weight']
- ))
- _c += 1
- if _c > 1:
- self.push_domain(task)
- else:
- self.push_remove(task)
- self.scheduler.add_excavate(lst, level=task['weight'])
- return True
- def excavate(self):
- t_name = threading.currentThread().getName()
- logger.info(f'[{t_name}]数据挖掘 - 启动')
- while True:
- tasks = self.scheduler.get_excavate_task()
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- if self.validator.data(task['url']):
- continue
- '''挖掘站点'''
- success = self.retrieve_site(task)
- if not success:
- '''url - 添加过滤器'''
- self.validator.add_data(task['url'])
- # '''挖掘记录'''
- # self.push_records(task)
- def start(self):
- logger.info(f'[数据挖掘]初始化加载')
- with ThreadPoolExecutor(self._workers, 'DataExcavate') as executor:
- futures = []
- for _ in range(1, self._workers + 1):
- f = executor.submit(self.excavate)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
|