|
@@ -1,8 +1,9 @@
|
|
|
import json
|
|
|
from itertools import chain
|
|
|
|
|
|
+from common.log import logger
|
|
|
from crawler.Task import Task
|
|
|
-from crawler.constants import REDIS
|
|
|
+from settings import REDIS
|
|
|
|
|
|
|
|
|
class RedisQueue:
|
|
@@ -25,24 +26,21 @@ class RedisQueue:
|
|
|
keys = sorted(keys, key=lambda x: x.split('-')[-1], reverse=True)
|
|
|
return keys
|
|
|
|
|
|
- def push_task(self, key, tasks, level=1, name=None):
|
|
|
+ def push_task(self, key, tasks, level=1):
|
|
|
"""
|
|
|
双端队列,左边推进任务
|
|
|
|
|
|
- :param name: 键名称
|
|
|
- :param key: 新键名称的前缀
|
|
|
+ :param key: 键名称
|
|
|
:param tasks: 任务
|
|
|
:param level: 优先级(int类型),数值越大优先级越高,默认1
|
|
|
:return: 任务队列任务数量
|
|
|
"""
|
|
|
# 重新定义优先队列的key
|
|
|
- if name is None:
|
|
|
- new_key = key + '-' + str(level)
|
|
|
- else:
|
|
|
- new_key = name
|
|
|
+ new_key = key + '-' + str(level)
|
|
|
+
|
|
|
# 序列化任务参数
|
|
|
tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks]
|
|
|
- print('RedisQueue info > the number of push tasks:', len(tasks))
|
|
|
+ logger.info('RedisQueue info > the number of push tasks:', len(tasks))
|
|
|
|
|
|
if not tasks:
|
|
|
return self.get_len(key)
|
|
@@ -78,23 +76,3 @@ class RedisQueue:
|
|
|
except TypeError:
|
|
|
pass
|
|
|
return results
|
|
|
-
|
|
|
-
|
|
|
-# if __name__ == '__main__':
|
|
|
- # mrq = RedisQueue()
|
|
|
-
|
|
|
- # 把任务推入redis 队列
|
|
|
- # lst = [Task(name=i, loop_times=i) for i in range(0, 40)]
|
|
|
- # print(mrq.push_task('C', lst, level=3))
|
|
|
-
|
|
|
- # 从redis queue取出任务
|
|
|
- # while True:
|
|
|
- # item = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
|
|
|
- # if len(item) > 0:
|
|
|
- # redis_key, task = item
|
|
|
- # print(redis_key, task)
|
|
|
-
|
|
|
- # 查看任务数量以及优先级情况
|
|
|
- # count, key_len = mrq.get_len('task')
|
|
|
- # print(key_len)
|
|
|
-
|