q.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. import json
  2. from itertools import chain
  3. from common.log import logger
  4. from crawler.Task import Task
  5. from settings import REDIS
  6. class RedisQueue:
  7. def __init__(self):
  8. self.redis = REDIS
  9. def get_len(self, key):
  10. keys = self.get_keys(key)
  11. # 每个键的任务数量
  12. key_len = [(k, self.redis.llen(k)) for k in keys]
  13. # 所有键的任务数量
  14. task_len = sum(dict(key_len).values())
  15. return task_len, key_len
  16. def get_keys(self, key):
  17. # Redis的键支持模式匹配
  18. keys = [key.decode() for key in self.redis.keys(key + '-[0-9]*')]
  19. # 按优先级将键降序排序
  20. keys = sorted(keys, key=lambda x: x.split('-')[-1], reverse=True)
  21. return keys
  22. def push_task(self, key, tasks, level=1):
  23. """
  24. 双端队列,左边推进任务
  25. :param key: 键名称
  26. :param tasks: 任务列表
  27. :param level: 优先级(int类型),数值越大优先级越高,默认1
  28. :return: 任务队列任务数量
  29. """
  30. # 重新定义优先队列的key
  31. new_key = key + '-' + str(level)
  32. # 序列化任务参数
  33. tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks]
  34. logger.info(f'RedisQueue info > the number of push tasks: {len(tasks)}')
  35. if not tasks:
  36. return self.get_len(key)
  37. self.redis.lpush(new_key, *tasks)
  38. return self.get_len(key)
  39. def pop_task(self, keys, priority=False):
  40. """
  41. 双端队列 右边弹出任务
  42. :param keys: 键列表
  43. :param priority: 优先级
  44. :return: 键名称(_task_key)和任务(_task)
  45. """
  46. while True:
  47. results = []
  48. # 避免在while循环中修改参数,将keys参数赋值到临时变量
  49. if not isinstance(keys, list):
  50. raise TypeError
  51. temp_keys = keys
  52. all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
  53. # 按优先级高到低弹出任务
  54. if priority:
  55. all_keys = sorted(all_keys, key=lambda x: x.split('-')[-1], reverse=True)
  56. if len(all_keys) > 0:
  57. try:
  58. _task_key, _task = self.redis.brpop(all_keys, timeout=1)
  59. results.extend([
  60. _task_key.decode(),
  61. json.loads(_task.decode(), object_hook=lambda d: Task(**d))
  62. ])
  63. except TypeError:
  64. pass
  65. return results