schedule.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. from crawler.q import RedisQueue
  2. from settings import REDIS_QUERY, REDIS_EXCAVATE
  3. class Scheduler:
  4. def __init__(self, queue=None):
  5. self.mrq = (queue or RedisQueue())
  6. def _add_tasks(self, classify: str, tasks, level=1, **kwargs):
  7. allow_output_log = kwargs.get('allow_output_log', True)
  8. if isinstance(tasks, list):
  9. _tasks = tasks
  10. else:
  11. _tasks = [tasks]
  12. if classify.lower() == 'query':
  13. self.mrq.push_task(
  14. REDIS_QUERY,
  15. _tasks,
  16. level=level,
  17. allow_output_log=allow_output_log
  18. )
  19. elif classify.lower() == 'excavate':
  20. self.mrq.push_task(
  21. REDIS_EXCAVATE,
  22. _tasks,
  23. level=level,
  24. allow_output_log=allow_output_log
  25. )
  26. def add_query(self, tasks, level=1, **kwargs):
  27. self._add_tasks('query', tasks, level, **kwargs)
  28. def add_excavate(self, tasks, level=1, **kwargs):
  29. self._add_tasks('excavate', tasks, level, **kwargs)
  30. def _get_task(self, classify: str):
  31. if classify.lower() == 'query':
  32. return self.mrq.pop_task([REDIS_QUERY], priority=True)
  33. elif classify.lower() == 'excavate':
  34. return self.mrq.pop_task([REDIS_EXCAVATE], priority=True)
  35. def get_excavate_task(self):
  36. return self._get_task('excavate')
  37. def get_query_task(self):
  38. return self._get_task('query')