import threading from concurrent.futures import ThreadPoolExecutor, wait from common.execptions import HostsRetrieveError from common.log import logger from crawler.engines import BingSearchEngine, QccSearchEngine from crawler.services.basics import BasicSearch from crawler.utils import ( extract_base_url, extract_domain, is_url, err_details, ) class QueryKeyWord(BasicSearch): def __init__( self, engine=None, query_workers=1, max_query_page=1, loop_query_interval=60, **kwargs ): super(QueryKeyWord, self).__init__(**kwargs) self.engine = (engine or BingSearchEngine()) self._name = engine.__class__.__name__ self._workers = query_workers self._max_pages = max_query_page self._interval = loop_query_interval def query_keyword(self): t_name = threading.currentThread().getName() logger.info(f'[查询搜索词]启动 - <{t_name} - {self._name}>') while True: tasks = self.scheduler.get_query_task(self.keyword_groups) if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks logger.info(f"<{t_name} - {self._name}>{task['groups']} >> {task['search']}") cur_page = 0 while cur_page < self._max_pages: cur_page += 1 '''检索页面元素生成数据挖掘任务''' lst = [] urls = self.engine.search(task['search'], cur_page) for url in urls: base_url = extract_base_url(url) if self.validator.data(base_url): continue lst.append(self.make_task( url=base_url, origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=task['weight'], )) '''推送数据挖掘队列''' self.scheduler.add_excavate(lst, level=task['weight']) logger.info(f'<{self._name}> {task["search"]}-第{cur_page}页-共{len(lst)}条') # '''查询记录''' # self.push_records(task) def start(self): logger.info(f'[查询搜索词]初始化加载') with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self.query_keyword) f.add_done_callback(err_details) futures.append(f) wait(futures) class QueryOrganization(BasicSearch): def __init__( self, engine=None, query_workers=1, loop_query_interval=60, **kwargs ): super(QueryOrganization, self).__init__(**kwargs) self.engine = (engine or QccSearchEngine()) self._name = engine.__class__.__name__ self._workers = query_workers self._interval = loop_query_interval def query_org(self): t_name = threading.currentThread().getName() logger.info(f'[查询单位组织]启动 - <{t_name} - {self._name}>') while True: tasks = self.scheduler.get_query_task(self.org_groups) if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks word = task['search'] logger.info(f"<{t_name} - {self._name}> {task['groups']} >> {word}") try: url = self.engine.search(word) task['url'] = url task['name'] = word task['domain'] = extract_domain(url) '''保存数据''' self.push_query(task) if not is_url(url): continue '''此处通过收录器判断是否是已收录网站,再决定是否推送数据挖掘队列''' if self.collector.data(task['domain']): continue '''设置任务为数据挖掘类型''' task['classify'] = self.visit_classify '''推送数据挖掘队列''' self.scheduler.add_excavate(task, level=task['weight']) except HostsRetrieveError as e: task['status_code'] = e.code task['err_reason'] = e.reason logger.exception(e) '''重新放回查询队列''' self.scheduler.add_query(self.org_groups, task, level=task['weight']) # '''查询记录''' # self.push_records(task) def start(self): logger.info(f'[查询单位组织]初始化加载') with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor: futures = [] for _ in range(1, self._workers + 1): f = executor.submit(self.query_org) f.add_done_callback(err_details) futures.append(f) wait(futures)