import json from itertools import chain from crawler.Task import Task from crawler.constants import REDIS class RedisQueue: def __init__(self): self.redis = REDIS def get_len(self, key): keys = self.get_keys(key) # 每个键的任务数量 key_len = [(k, self.redis.llen(k)) for k in keys] # 所有键的任务数量 task_len = sum(dict(key_len).values()) return task_len, key_len def get_keys(self, key): # Redis的键支持模式匹配 keys = [key.decode() for key in self.redis.keys(key + '-[0-9]*')] # 按优先级将键降序排序 keys = sorted(keys, key=lambda x: x.split('-')[-1], reverse=True) return keys def push_task(self, key, tasks, level=1, name=None): """ 双端队列,左边推进任务 :param name: 键名称 :param key: 新键名称的前缀 :param tasks: 任务 :param level: 优先级(int类型),数值越大优先级越高,默认1 :return: 任务队列任务数量 """ # 重新定义优先队列的key if name is None: new_key = key + '-' + str(level) else: new_key = name # 序列化任务参数 tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks] print('RedisQueue info > the number of push tasks:', len(tasks)) if not tasks: return self.get_len(key) self.redis.lpush(new_key, *tasks) return self.get_len(key) def pop_task(self, keys, priority=False): """ 双端队列 右边弹出任务 :param keys: 键列表,默认为None(将获取所有任务的keys) :param priority: 优先级 :return: 键名称(_task_key)和任务(_task) """ while True: results = [] # 避免在while循环中修改参数,将keys参数赋值到临时变量 temp_keys = keys all_keys = list(chain(*[self.get_keys(k) for k in temp_keys])) # 按优先级高到低弹出任务 if priority: all_keys = sorted(all_keys, key=lambda x: x.split('-')[-1], reverse=True) if len(all_keys) > 0: try: _task_key, _task = self.redis.brpop(all_keys, timeout=1) results.extend([ _task_key.decode(), json.loads(_task.decode(), object_hook=lambda d: Task(**d)) ]) except TypeError: pass return results # if __name__ == '__main__': # mrq = RedisQueue() # 把任务推入redis 队列 # lst = [Task(name=i, loop_times=i) for i in range(0, 40)] # print(mrq.push_task('C', lst, level=3)) # 从redis queue取出任务 # while True: # item = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True) # if len(item) > 0: # redis_key, task = item # print(redis_key, task) # 查看任务数量以及优先级情况 # count, key_len = mrq.get_len('task') # print(key_len)