123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import threading
- from concurrent.futures import ThreadPoolExecutor, wait
- from common.execptions import HostsRetrieveError
- from common.log import logger
- from crawler.engines import BingSearchEngine, QccSearchEngine
- from crawler.services.basics import BasicSearch
- from crawler.utils import (
- extract_base_url,
- extract_domain,
- is_url,
- err_details,
- )
- class QueryKeyWord(BasicSearch):
- def __init__(
- self,
- engine=None,
- query_workers=1,
- max_query_page=1,
- loop_query_interval=60,
- **kwargs
- ):
- super(QueryKeyWord, self).__init__(**kwargs)
- self.engine = (engine or BingSearchEngine())
- self._name = engine.__class__.__name__
- self._workers = query_workers
- self._max_pages = max_query_page
- self._interval = loop_query_interval
- def query_keyword(self):
- t_name = threading.currentThread().getName()
- logger.info(f'[查询搜索词]启动 - <{t_name} - {self._name}>')
- while True:
- tasks = self.scheduler.get_query_task(self.keyword_groups)
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- logger.info(f"<{t_name} - {self._name}>{task['groups']} >> {task['search']}")
- cur_page = 0
- while cur_page < self._max_pages:
- cur_page += 1
- '''检索页面元素生成数据挖掘任务'''
- lst = []
- urls = self.engine.search(task['search'], cur_page)
- for url in urls:
- base_url = extract_base_url(url)
- if self.validator.data(base_url):
- continue
- lst.append(self.make_task(
- url=base_url,
- origin=task['origin'],
- groups=task['groups'],
- classify=self.visit_classify,
- weight=task['weight'],
- ))
- '''推送数据挖掘队列'''
- self.scheduler.add_excavate(lst, level=task['weight'])
- logger.info(f'<{self._name}> {task["search"]}-第{cur_page}页-共{len(lst)}条')
- # '''查询记录'''
- # self.push_records(task)
- def start(self):
- logger.info(f'[查询搜索词]初始化加载')
- with ThreadPoolExecutor(self._workers, 'QueryKeyWord') as executor:
- futures = []
- for _ in range(1, self._workers + 1):
- f = executor.submit(self.query_keyword)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
- class QueryOrganization(BasicSearch):
- def __init__(
- self,
- engine=None,
- query_workers=1,
- loop_query_interval=60,
- **kwargs
- ):
- super(QueryOrganization, self).__init__(**kwargs)
- self.engine = (engine or QccSearchEngine())
- self._name = engine.__class__.__name__
- self._workers = query_workers
- self._interval = loop_query_interval
- def query_org(self):
- t_name = threading.currentThread().getName()
- logger.info(f'[查询单位组织]启动 - <{t_name} - {self._name}>')
- while True:
- tasks = self.scheduler.get_query_task(self.org_groups)
- if len(tasks) == 0:
- self.loops_interval(self._interval)
- continue
- task_key, task = tasks
- word = task['search']
- logger.info(f"<{t_name} - {self._name}> {task['groups']} >> {word}")
- try:
- url = self.engine.search(word)
- task['url'] = url
- task['name'] = word
- task['domain'] = extract_domain(url)
- '''保存数据'''
- self.push_query(task)
- if not is_url(url):
- continue
- '''此处通过收录器判断是否是已收录网站,再决定是否推送数据挖掘队列'''
- if self.collector.data(task['domain']):
- continue
- '''设置任务为数据挖掘类型'''
- task['classify'] = self.visit_classify
- '''推送数据挖掘队列'''
- self.scheduler.add_excavate(task, level=task['weight'])
- except HostsRetrieveError as e:
- task['status_code'] = e.code
- task['err_reason'] = e.reason
- logger.exception(e)
- '''重新放回查询队列'''
- self.scheduler.add_query(self.org_groups, task, level=task['weight'])
- # '''查询记录'''
- # self.push_records(task)
- def start(self):
- logger.info(f'[查询单位组织]初始化加载')
- with ThreadPoolExecutor(self._workers, 'QueryOrganization') as executor:
- futures = []
- for _ in range(1, self._workers + 1):
- f = executor.submit(self.query_org)
- f.add_done_callback(err_details)
- futures.append(f)
- wait(futures)
|