|
@@ -79,3 +79,37 @@ class RedisQueue:
|
|
|
except TypeError:
|
|
|
pass
|
|
|
return results
|
|
|
+
|
|
|
+ def push_task_by_key(self, key, tasks):
|
|
|
+ """
|
|
|
+ 通过键名称从左边推进任务
|
|
|
+
|
|
|
+ :param key: 键名称
|
|
|
+ :param tasks: 推送的任务
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks]
|
|
|
+ logger.info(f'RedisQueue info > the number of push tasks: {len(tasks)}')
|
|
|
+ if not tasks:
|
|
|
+ return self.redis.llen(key)
|
|
|
+
|
|
|
+ self.redis.lpush(key, *tasks)
|
|
|
+ return self.redis.llen(key)
|
|
|
+
|
|
|
+ def pop_task_by_key(self, key):
|
|
|
+ """
|
|
|
+ 通过键名从右边取任务
|
|
|
+
|
|
|
+ :param key: 键名称
|
|
|
+ :return: 任务列表
|
|
|
+ """
|
|
|
+ while True:
|
|
|
+ results = []
|
|
|
+ _task_key = key
|
|
|
+ _task = self.redis.rpop(key)
|
|
|
+ if _task is not None:
|
|
|
+ results.extend([
|
|
|
+ _task_key,
|
|
|
+ json.loads(_task.decode(), object_hook=lambda d: Task(**d))
|
|
|
+ ])
|
|
|
+ return results
|