123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- import threading
- from concurrent.futures import ThreadPoolExecutor, wait
- from common.execptions import HostsRetrieveError
- from common.log import logger
- from crawler.engines import JySearchEngine
- from crawler.services.basics import BasicSearch
- from crawler.utils import (
- extract_base_url,
- extract_domain,
- is_url,
- err_details,
- )
- class DataQuery(BasicSearch):
- def __init__(
- self,
- engines=None,
- max_query_page=1,
- loop_query_interval=60,
- **kwargs
- ):
- super(DataQuery, self).__init__(**kwargs)
- self._max_pages = max_query_page
- self._interval = loop_query_interval
- self._engines = []
- self.set_engines(engines)
- def _init(self):
- self.set_engines(self._engines)
- def _set_engine(self, engine):
- if isinstance(engine, JySearchEngine):
- self._engines.append(engine)
- logger.info(f'[数据查询]添加引擎 - <{engine.__class__.__name__}>')
- return self
- def set_engines(self, engines):
- if isinstance(engines, list):
- for engine in engines:
- self._set_engine(engine)
- else:
- self._set_engine(engines)
- return self
- def search(self, engine):
- ename = engine.__class__.__name__
- threading.currentThread().setName(ename)
- logger.info(f'[数据查询]启动引擎 - <{ename}>')
- while True:
- tasks = self.scheduler.get_query_task()
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- word = task['search']
- if task['groups'] == self.org_groups:
- '''使用企查查服务检索site'''
- logger.info(f"<QccSearch> {task['groups']} >>> {word}")
- try:
- url = engine.by_org_get_site(word)
- task['url'] = url
- task['name'] = word
- task['domain'] = extract_domain(url)
- '''保存数据'''
- self.push_query(task)
- if not is_url(url):
- continue
- if self.validator.data(task['domain']):
- continue
- '''domain - 添加过滤器'''
- self.validator.add_data(task['domain'])
- '''推送数据挖掘队列'''
- task['classify'] = self.visit_classify
- self.scheduler.add_excavate(task, level=task['weight'])
- except HostsRetrieveError as e:
- task['status_code'] = e.code
- task['err_reason'] = e.reason
- logger.exception(e)
- '''重新放回查询队列'''
- self.scheduler.add_query(task, level=task['weight'])
- else:
- '''使用搜索引擎查询关键词'''
- logger.info(f"<{ename}> {task['groups']} >>> {word}")
- cur_page = 0
- while cur_page < self._max_pages:
- cur_page += 1
- '''检索文本'''
- lst = []
- urls = engine.search(word, cur_page)
- '''生成数据挖掘任务'''
- for url in urls:
- domain = extract_domain(url)
- if self.validator.data(domain):
- continue
- lst.append(self.make_task(
- url=extract_base_url(url),
- origin=task['origin'],
- groups=task['groups'],
- classify=self.visit_classify,
- weight=task['weight'],
- ))
- '''推送数据挖掘队列'''
- self.scheduler.add_excavate(lst, level=task['weight'])
- logger.info(f'<{ename}> {word}-第{cur_page}页-共{len(lst)}条')
- '''数据记录'''
- self.push_records(task)
- def start(self):
- if len(self._engines) > 0:
- logger.info(f'[数据查询]初始化加载')
- max_workers = len(self._engines) # 根据搜索引擎设置最大线程池
- with ThreadPoolExecutor(max_workers, 'DataQuery') as executor:
- futures = []
- for engine in self._engines:
- f = executor.submit(self.search, engine)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
|