from concurrent.futures import ThreadPoolExecutor, wait from common.execptions import ExploreDataError from common.log import logger from crawler.services.basics import BasicService from crawler.utils import ( extract_host, extract_domain, is_url, err_details, ) class DataQuery(BasicService): """数据查询服务""" def __init__(self, engine, **kwargs): self._workers = (kwargs.pop('query_workers', None) or 1) self._interval = (kwargs.pop('query_interval', None) or 60) super(DataQuery, self).__init__(**kwargs) self._init(engine) self.kwargs = kwargs def _init(self, engine): _app_items = { self.keyword_groups: self._keywords, self.org_groups: self._organization } self._engine = engine self._name = engine.__class__.__name__ self._app = _app_items[engine.usage] self._app_name = f'dataQuery_{engine.usage}' def _keywords(self): logger.info(f'开启线程 - <{self.thread_name}>') _max_pages = (self.kwargs.pop('max_pages', None) or 1) while True: tasks = self.scheduler.get_query_task(self.keyword_groups) if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks logger.info(f"<{self.thread_name}> - {self._name} - {task['search']}") cur_page = 0 while cur_page < _max_pages: cur_page += 1 '''检索页面元素生成数据挖掘任务''' lst = [] urls = self._engine.search(task['search'], cur_page) for url in urls: host = extract_host(url) if not self.validator.data(host): lst.append(self.make_task( url=host, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=task['weight'], )) '''推送数据挖掘队列''' self.scheduler.add_excavate(lst, level=task['weight']) msg = "<{}> - {} - {} - 第{}页,共{}条".format( self.thread_name, self._name, task["search"], cur_page, len(lst) ) logger.info(msg) # '''查询记录''' # self.push_records(task) def _organization(self): logger.info(f'开启线程 - <{self.thread_name}>') while True: tasks = self.scheduler.get_query_task(self.org_groups) if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks word = task['search'] logger.info(f"<{self.thread_name}> - {self._name} - {word}") try: url = self._engine.search(word) task['url'] = url task['name'] = word task['domain'] = extract_domain(url) '''保存数据''' self.push_query(task) if not is_url(url): continue '''此处通过收录器判断是否已收录网站,再决定是否推送数据挖掘队列''' if self.collector.data(task['domain']): continue '''设置任务为数据挖掘类型''' task['classify'] = self.visit_classify '''推送数据挖掘队列''' self.scheduler.add_excavate(task, level=task['weight']) except ExploreDataError as e: task['status_code'] = e.code task['err_reason'] = e.reason logger.exception(e) '''重新放回查询队列''' self.scheduler.add_query(self.org_groups, task, level=task['weight']) # '''查询记录''' # self.push_records(task) def start(self): with ThreadPoolExecutor(self._workers, self._app_name) as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self._app) f.add_done_callback(err_details) futures.append(f) wait(futures)