q.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import json
  2. from itertools import chain
  3. from common.execptions import TaskError
  4. from common.log import logger
  5. from crawler.Task import Task
  6. from settings import REDIS
  7. class RedisQueue:
  8. def __init__(self):
  9. self.redis = REDIS
  10. def get_len(self, key):
  11. keys = self.get_keys(key)
  12. # 每个键的任务数量
  13. key_len = [(k, self.redis.llen(k)) for k in keys]
  14. # 所有键的任务数量
  15. task_len = sum(dict(key_len).values())
  16. return task_len, key_len
  17. def get_keys(self, key):
  18. # Redis的键支持模式匹配
  19. keys = [key for key in self.redis.keys(key + '-[0-9]*')]
  20. # 按优先级将键降序排序
  21. keys = sorted(keys, key=lambda x: x.split('-')[-1], reverse=True)
  22. return keys
  23. def push_task(self, key, tasks, level=1, disable_debug_log=True):
  24. """
  25. 双端队列,左边推进任务
  26. :param key: 键名称
  27. :param tasks: 任务列表
  28. :param level: 优先级(int类型),数值越大优先级越高,默认1
  29. :param disable_debug_log: 调试日志
  30. :return: 任务队列任务数量
  31. """
  32. # 重新定义优先队列的key
  33. new_key = key + '-' + str(level)
  34. # 序列化任务参数
  35. tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks]
  36. if not disable_debug_log:
  37. logger.debug(f'RedisQueue info > the number of push tasks: {len(tasks)}')
  38. if not tasks:
  39. return self.get_len(key)
  40. self.redis.lpush(new_key, *tasks)
  41. return self.get_len(key)
  42. def pop_task(self, keys: list, priority=False):
  43. """
  44. 双端队列 右边弹出任务
  45. :param keys: 键列表
  46. :param priority: 优先级
  47. :return: 键名称(_task_key)和任务(_task)
  48. """
  49. while True:
  50. results = []
  51. # 避免在while循环中修改参数,将keys参数赋值到临时变量
  52. if not isinstance(keys, list):
  53. raise TaskError(f'keys类型:list,当前类型:{type(keys).__name__}')
  54. temp_keys = keys
  55. all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
  56. # 按优先级高到低弹出任务
  57. if priority:
  58. all_keys = sorted(all_keys, key=lambda x: x.split('-')[-1], reverse=True)
  59. if len(all_keys) > 0:
  60. try:
  61. _task_key, _task = self.redis.brpop(all_keys, timeout=1)
  62. results.extend([
  63. _task_key,
  64. json.loads(_task, object_hook=lambda d: Task(**d))
  65. ])
  66. except TypeError:
  67. pass
  68. return results
  69. def push_task_by_key(self, key, tasks, disable_debug_log=True):
  70. """
  71. 通过键名称从左边推进任务
  72. :param key: 键名称
  73. :param tasks: 推送的任务
  74. :param disable_debug_log: 调试日志
  75. :return:
  76. """
  77. tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks]
  78. if not disable_debug_log:
  79. logger.debug(f'RedisQueue info > the number of push tasks: {len(tasks)}')
  80. if not tasks:
  81. return self.redis.llen(key)
  82. self.redis.lpush(key, *tasks)
  83. return self.redis.llen(key)
  84. def pop_task_by_key(self, key):
  85. """
  86. 通过键名从右边取任务
  87. :param key: 键名称
  88. :return: 任务列表
  89. """
  90. while True:
  91. results = []
  92. _task_key = key
  93. _task = self.redis.rpop(key)
  94. if _task is not None:
  95. results.extend([
  96. _task_key,
  97. json.loads(_task, object_hook=lambda d: Task(**d))
  98. ])
  99. return results