123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import json
- from itertools import chain
- from common.execptions import TaskError
- from common.log import logger
- from crawler.Task import Task
- from settings import REDIS
- class RedisQueue:
- def __init__(self):
- self.redis = REDIS
- def get_len(self, key):
- keys = self.get_keys(key)
- # 每个键的任务数量
- key_len = [(k, self.redis.llen(k)) for k in keys]
- # 所有键的任务数量
- task_len = sum(dict(key_len).values())
- return task_len, key_len
- def get_keys(self, key):
- # Redis的键支持模式匹配
- keys = [key for key in self.redis.keys(key + '-[0-9]*')]
- # 按优先级将键降序排序
- keys = sorted(keys, key=lambda x: x.split('-')[-1], reverse=True)
- return keys
- def push_task(self, key, tasks, level=1, disable_debug_log=True):
- """
- 双端队列,左边推进任务
- :param key: 键名称
- :param tasks: 任务列表
- :param level: 优先级(int类型),数值越大优先级越高,默认1
- :param disable_debug_log: 调试日志
- :return: 任务队列任务数量
- """
- # 重新定义优先队列的key
- new_key = key + '-' + str(level)
- # 序列化任务参数
- tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks]
- if not disable_debug_log:
- logger.debug(f'RedisQueue info > the number of push tasks: {len(tasks)}')
- if not tasks:
- return self.get_len(key)
- self.redis.lpush(new_key, *tasks)
- return self.get_len(key)
- def pop_task(self, keys: list, priority=False):
- """
- 双端队列 右边弹出任务
- :param keys: 键列表
- :param priority: 优先级
- :return: 键名称(_task_key)和任务(_task)
- """
- while True:
- results = []
- # 避免在while循环中修改参数,将keys参数赋值到临时变量
- if not isinstance(keys, list):
- raise TaskError(f'keys类型:list,当前类型:{type(keys).__name__}')
- temp_keys = keys
- all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
- # 按优先级高到低弹出任务
- if priority:
- all_keys = sorted(all_keys, key=lambda x: x.split('-')[-1], reverse=True)
- if len(all_keys) > 0:
- try:
- _task_key, _task = self.redis.brpop(all_keys, timeout=1)
- results.extend([
- _task_key,
- json.loads(_task, object_hook=lambda d: Task(**d))
- ])
- except TypeError:
- pass
- return results
- def push_task_by_key(self, key, tasks, disable_debug_log=True):
- """
- 通过键名称从左边推进任务
- :param key: 键名称
- :param tasks: 推送的任务
- :param disable_debug_log: 调试日志
- :return:
- """
- tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks]
- if not disable_debug_log:
- logger.debug(f'RedisQueue info > the number of push tasks: {len(tasks)}')
- if not tasks:
- return self.redis.llen(key)
- self.redis.lpush(key, *tasks)
- return self.redis.llen(key)
- def pop_task_by_key(self, key):
- """
- 通过键名从右边取任务
- :param key: 键名称
- :return: 任务列表
- """
- while True:
- results = []
- _task_key = key
- _task = self.redis.rpop(key)
- if _task is not None:
- results.extend([
- _task_key,
- json.loads(_task, object_hook=lambda d: Task(**d))
- ])
- return results
|