123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import threading
- from concurrent.futures import ThreadPoolExecutor, wait
- from common.execptions import HostsRetrieveError
- from common.log import logger
- from crawler.engines import BingSearchEngine, QccSearchEngine
- from crawler.services.basics import BasicSearch
- from crawler.utils import (
- extract_base_url,
- extract_domain,
- is_url,
- err_details,
- )
- class QueryKeyWord(BasicSearch):
- def __init__(self, engine=None, **kwargs):
- self._workers = (kwargs.pop('query_workers', None) or 1)
- self._max_pages = (kwargs.pop('max_pages', None) or 1)
- self._interval = (kwargs.pop('query_interval', None) or 60)
- super(QueryKeyWord, self).__init__(**kwargs)
- self.engine = (engine or BingSearchEngine())
- self._name = self.engine.__class__.__name__
- def query_keyword(self):
- t_name = threading.currentThread().getName()
- logger.info(f'开启线程 - <{t_name}>')
- while True:
- tasks = self.scheduler.get_query_task(self.keyword_groups)
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- logger.info(f"<{t_name}> - {self._name} - {task['search']}")
- cur_page = 0
- while cur_page < self._max_pages:
- cur_page += 1
- '''检索页面元素生成数据挖掘任务'''
- lst = []
- urls = self.engine.search(task['search'], cur_page)
- for url in urls:
- base_url = extract_base_url(url)
- if not self.validator.data(base_url):
- lst.append(self.make_task(
- url=base_url,
- origin=task['origin'],
- groups=task['groups'],
- classify=self.visit_classify,
- weight=task['weight'],
- ))
- '''推送数据挖掘队列'''
- self.scheduler.add_excavate(lst, level=task['weight'])
- logger.info(f'<{t_name}> - {self._name} - {task["search"]} - 第{cur_page}页,共{len(lst)}条')
- # '''查询记录'''
- # self.push_records(task)
- def start(self):
- with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
- futures = []
- for _ in range(1, self._workers + 1):
- f = executor.submit(self.query_keyword)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
- class QueryOrganization(BasicSearch):
- def __init__(self, engine=None, **kwargs):
- self._workers = (kwargs.pop('query_workers', None) or 1)
- self._interval = (kwargs.pop('query_interval', None) or 60)
- super(QueryOrganization, self).__init__(**kwargs)
- self.engine = (engine or QccSearchEngine())
- self._name = self.engine.__class__.__name__
- def query_org(self):
- t_name = threading.currentThread().getName()
- logger.info(f'开启线程 - <{t_name}>')
- while True:
- tasks = self.scheduler.get_query_task(self.org_groups)
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- word = task['search']
- logger.info(f"<{t_name}> - {self._name} - {word}")
- try:
- url = self.engine.search(word)
- task['url'] = url
- task['name'] = word
- task['domain'] = extract_domain(url)
- '''保存数据'''
- self.push_query(task)
- if not is_url(url):
- continue
- '''此处通过收录器判断是否已收录网站,再决定是否推送数据挖掘队列'''
- if self.collector.data(task['domain']):
- continue
- '''设置任务为数据挖掘类型'''
- task['classify'] = self.visit_classify
- '''推送数据挖掘队列'''
- self.scheduler.add_excavate(task, level=task['weight'])
- except HostsRetrieveError as e:
- task['status_code'] = e.code
- task['err_reason'] = e.reason
- logger.exception(e)
- '''重新放回查询队列'''
- self.scheduler.add_query(self.org_groups, task, level=task['weight'])
- # '''查询记录'''
- # self.push_records(task)
- def start(self):
- with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
- futures = []
- for _ in range(1, self._workers + 1):
- f = executor.submit(self.query_org)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
|