12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- from crawler.q import RedisQueue
- from settings import REDIS_QUERY, REDIS_EXCAVATE
- class Scheduler:
- def __init__(self, queue=None):
- self.mrq = (queue or RedisQueue())
- def _add_tasks(self, classify: str, tasks, level=1, **kwargs):
- allow_output_log = kwargs.get('allow_output_log', True)
- if isinstance(tasks, list):
- _tasks = tasks
- else:
- _tasks = [tasks]
- if classify.lower() == 'query':
- self.mrq.push_task(
- REDIS_QUERY,
- _tasks,
- level=level,
- allow_output_log=allow_output_log
- )
- elif classify.lower() == 'excavate':
- self.mrq.push_task(
- REDIS_EXCAVATE,
- _tasks,
- level=level,
- allow_output_log=allow_output_log
- )
- def add_query(self, tasks, level=1, **kwargs):
- self._add_tasks('query', tasks, level, **kwargs)
- def add_excavate(self, tasks, level=1, **kwargs):
- self._add_tasks('excavate', tasks, level, **kwargs)
- def _get_task(self, classify: str):
- if classify.lower() == 'query':
- return self.mrq.pop_task([REDIS_QUERY], priority=True)
- elif classify.lower() == 'excavate':
- return self.mrq.pop_task([REDIS_EXCAVATE], priority=True)
- def get_excavate_task(self):
- return self._get_task('excavate')
- def get_query_task(self):
- return self._get_task('query')
|