123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051 |
- from constants import ORGANIZATION, KEYWORD
- from crawler.q import RedisQueue
- from settings import REDIS_QUERY_KEYWORD, REDIS_EXCAVATE, REDIS_QUERY_ORGS
- class Scheduler:
- def __init__(self, queue=None):
- self.mrq = (queue or RedisQueue())
- def _add_tasks(self, classify: str, tasks, level=1, group='', **kwargs):
- if isinstance(tasks, list):
- _tasks = tasks
- else:
- _tasks = [tasks]
- if classify.lower() == 'query' and group.lower() == KEYWORD:
- redis_key = REDIS_QUERY_KEYWORD
- elif classify.lower() == 'query' and group.lower() == ORGANIZATION:
- redis_key = REDIS_QUERY_ORGS
- elif classify.lower() == 'excavate':
- redis_key = REDIS_EXCAVATE
- else:
- return
- self.mrq.push_task(
- redis_key,
- _tasks,
- level=level,
- disable_debug_log=kwargs.pop('disable_debug_log', False)
- )
- def add_query(self, group, tasks, level=1, **kwargs):
- self._add_tasks('query', tasks, level, group, **kwargs)
- def add_excavate(self, tasks, level=1, **kwargs):
- self._add_tasks('excavate', tasks, level, **kwargs)
- def _get_task(self, classify: str, group=''):
- if classify.lower() == 'query' and group.lower() == KEYWORD:
- return self.mrq.pop_task([REDIS_QUERY_KEYWORD], priority=True)
- elif classify.lower() == 'query' and group.lower() == ORGANIZATION:
- return self.mrq.pop_task([REDIS_QUERY_ORGS], priority=True)
- elif classify.lower() == 'excavate':
- return self.mrq.pop_task([REDIS_EXCAVATE], priority=True)
- def get_excavate_task(self):
- return self._get_task('excavate')
- def get_query_task(self, group):
- return self._get_task('query', group=group)
|