import threading from concurrent.futures import ThreadPoolExecutor, wait from common.execptions import HostsRetrieveError from common.log import logger from crawler.engines import JySearchEngine from crawler.services.basics import BasicSearch from crawler.utils import ( extract_base_url, extract_domain, is_url, err_details, ) class DataQuery(BasicSearch): def __init__( self, engines=None, max_query_page=1, loop_query_interval=60, **kwargs ): super(DataQuery, self).__init__(**kwargs) self._max_pages = max_query_page self._interval = loop_query_interval self._engines = [] self.set_engines(engines) def _init(self): self.set_engines(self._engines) def _set_engine(self, engine): if isinstance(engine, JySearchEngine): self._engines.append(engine) logger.info(f'[数据查询]添加引擎 - <{engine.__class__.__name__}>') return self def set_engines(self, engines): if isinstance(engines, list): for engine in engines: self._set_engine(engine) else: self._set_engine(engines) return self def search(self, engine): ename = engine.__class__.__name__ threading.currentThread().setName(ename) logger.info(f'[数据查询]启动引擎 - <{ename}>') while True: tasks = self.scheduler.get_query_task() if len(tasks) == 0: self.loops_interval(self._interval) continue task_key, task = tasks word = task['search'] if task['groups'] == self.org_groups: '''使用企查查服务检索site''' logger.info(f" {task['groups']} >>> {word}") try: url = engine.by_org_get_site(word) task['url'] = url task['name'] = word task['domain'] = extract_domain(url) '''保存数据''' self.push_query(task) if not is_url(url): continue if self.validator.data(task['domain']): continue '''domain - 添加过滤器''' self.validator.add_data(task['domain']) '''推送数据挖掘队列''' task['classify'] = self.visit_classify self.scheduler.add_excavate(task, level=task['weight']) except HostsRetrieveError as e: task['status_code'] = e.code task['err_reason'] = e.reason logger.exception(e) '''重新放回查询队列''' self.scheduler.add_query(task, level=task['weight']) else: '''使用搜索引擎查询关键词''' logger.info(f"<{ename}> {task['groups']} >>> {word}") cur_page = 0 while cur_page < self._max_pages: cur_page += 1 '''检索文本''' lst = [] urls = engine.search(word, cur_page) '''生成数据挖掘任务''' for url in urls: domain = extract_domain(url) if self.validator.data(domain): continue lst.append(self.make_task( url=extract_base_url(url), origin=task['origin'], groups=task['groups'], classify=self.visit_classify, weight=task['weight'], )) '''推送数据挖掘队列''' self.scheduler.add_excavate(lst, level=task['weight']) logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条') '''数据记录''' self.push_records(task) def start(self): if len(self._engines) > 0: logger.info(f'[数据查询]初始化加载') max_workers = len(self._engines) # 根据搜索引擎设置最大线程池 with ThreadPoolExecutor(max_workers, 'DataQuery') as executor: futures = [] for engine in self._engines: f = executor.submit(self.search, engine) f.add_done_callback(err_details) futures.append(f) wait(futures)