schedule.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. from constants import ORGANIZATION, KEYWORD
  2. from crawler.q import RedisQueue
  3. from settings import REDIS_QUERY_KEYWORD, REDIS_EXCAVATE, REDIS_QUERY_ORGS
  4. class Scheduler:
  5. def __init__(self, queue=None):
  6. self.mrq = (queue or RedisQueue())
  7. def _add_tasks(self, classify: str, tasks, level=1, group='', **kwargs):
  8. if isinstance(tasks, list):
  9. _tasks = tasks
  10. else:
  11. _tasks = [tasks]
  12. if classify.lower() == 'query' and group.lower() == KEYWORD:
  13. redis_key = REDIS_QUERY_KEYWORD
  14. elif classify.lower() == 'query' and group.lower() == ORGANIZATION:
  15. redis_key = REDIS_QUERY_ORGS
  16. elif classify.lower() == 'excavate':
  17. redis_key = REDIS_EXCAVATE
  18. else:
  19. return
  20. self.mrq.push_task(
  21. redis_key,
  22. _tasks,
  23. level=level,
  24. disable_debug_log=kwargs.pop('disable_debug_log', False)
  25. )
  26. def add_query(self, group, tasks, level=1, **kwargs):
  27. self._add_tasks('query', tasks, level, group, **kwargs)
  28. def add_excavate(self, tasks, level=1, **kwargs):
  29. self._add_tasks('excavate', tasks, level, **kwargs)
  30. def _get_task(self, classify: str, group=''):
  31. if classify.lower() == 'query' and group.lower() == KEYWORD:
  32. return self.mrq.pop_task([REDIS_QUERY_KEYWORD], priority=True)
  33. elif classify.lower() == 'query' and group.lower() == ORGANIZATION:
  34. return self.mrq.pop_task([REDIS_QUERY_ORGS], priority=True)
  35. elif classify.lower() == 'excavate':
  36. return self.mrq.pop_task([REDIS_EXCAVATE], priority=True)
  37. def get_excavate_task(self):
  38. return self._get_task('excavate')
  39. def get_query_task(self, group):
  40. return self._get_task('query', group=group)