schedule.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. from constants import ORGANIZATION, KEYWORD
  2. from crawler.q import RedisQueue
  3. from settings import REDIS_QUERY_KEYWORD, REDIS_EXCAVATE, REDIS_QUERY_ORGS
  4. class Scheduler:
  5. def __init__(self, queue=None):
  6. self.mrq = (queue or RedisQueue())
  7. def _add_tasks(self, classify: str, tasks, level=1, group='', **kwargs):
  8. allow_output_log = kwargs.get('allow_output_log', True)
  9. if isinstance(tasks, list):
  10. _tasks = tasks
  11. else:
  12. _tasks = [tasks]
  13. if classify.lower() == 'query' and group.lower() == KEYWORD:
  14. redis_key = REDIS_QUERY_KEYWORD
  15. elif classify.lower() == 'query' and group.lower() == ORGANIZATION:
  16. redis_key = REDIS_QUERY_ORGS
  17. elif classify.lower() == 'excavate':
  18. redis_key = REDIS_EXCAVATE
  19. else:
  20. return
  21. self.mrq.push_task(
  22. redis_key,
  23. _tasks,
  24. level=level,
  25. allow_output_log=allow_output_log
  26. )
  27. def add_query(self, group, tasks, level=1, **kwargs):
  28. self._add_tasks('query', tasks, level, group, **kwargs)
  29. def add_excavate(self, tasks, level=1, **kwargs):
  30. self._add_tasks('excavate', tasks, level, **kwargs)
  31. def _get_task(self, classify: str, group=''):
  32. if classify.lower() == 'query' and group.lower() == KEYWORD:
  33. return self.mrq.pop_task([REDIS_QUERY_KEYWORD], priority=True)
  34. elif classify.lower() == 'query' and group.lower() == ORGANIZATION:
  35. return self.mrq.pop_task([REDIS_QUERY_ORGS], priority=True)
  36. elif classify.lower() == 'excavate':
  37. return self.mrq.pop_task([REDIS_EXCAVATE], priority=True)
  38. def get_excavate_task(self):
  39. return self._get_task('excavate')
  40. def get_query_task(self, group):
  41. return self._get_task('query', group=group)