|
@@ -1,26 +1,10 @@
|
|
|
import json
|
|
|
-import re
|
|
|
-import time
|
|
|
-from datetime import datetime, date
|
|
|
from itertools import chain
|
|
|
|
|
|
+from crawler.Task import Task
|
|
|
from crawler.constants import REDIS
|
|
|
|
|
|
|
|
|
-class ExpandJsonEncoder(json.JSONEncoder):
|
|
|
- """
|
|
|
- 采用json方式序列化传入的任务参数,而原生的json.dumps()方法不支持datetime、date,这里做了扩展
|
|
|
- """
|
|
|
-
|
|
|
- def default(self, obj):
|
|
|
- if isinstance(obj, datetime):
|
|
|
- return obj.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
- elif isinstance(obj, date):
|
|
|
- return obj.strftime('%Y-%m-%d')
|
|
|
- else:
|
|
|
- return json.JSONEncoder.default(self, obj)
|
|
|
-
|
|
|
-
|
|
|
class RedisQueue:
|
|
|
|
|
|
def __init__(self):
|
|
@@ -41,19 +25,23 @@ class RedisQueue:
|
|
|
keys = sorted(keys, key=lambda x: x.split('-')[-1], reverse=True)
|
|
|
return keys
|
|
|
|
|
|
- def push_task(self, key, tasks, level=1):
|
|
|
+ def push_task(self, key, tasks, level=1, name=None):
|
|
|
"""
|
|
|
双端队列,左边推进任务
|
|
|
|
|
|
- :param key: 键名称
|
|
|
+ :param name: 键名称
|
|
|
+ :param key: 新键名称的前缀
|
|
|
:param tasks: 任务
|
|
|
:param level: 优先级(int类型),数值越大优先级越高,默认1
|
|
|
:return: 任务队列任务数量
|
|
|
"""
|
|
|
# 重新定义优先队列的key
|
|
|
- new_key = key + '-' + str(level)
|
|
|
+ if name is None:
|
|
|
+ new_key = key + '-' + str(level)
|
|
|
+ else:
|
|
|
+ new_key = name
|
|
|
# 序列化任务参数
|
|
|
- tasks = [json.dumps(t, cls=ExpandJsonEncoder) for t in tasks]
|
|
|
+ tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks]
|
|
|
print('RedisQueue info > the number of push tasks:', len(tasks))
|
|
|
|
|
|
if not tasks:
|
|
@@ -62,50 +50,51 @@ class RedisQueue:
|
|
|
self.redis.lpush(new_key, *tasks)
|
|
|
return self.get_len(key)
|
|
|
|
|
|
- def pop_task(self, keys=None, priority=False):
|
|
|
+ def pop_task(self, keys, priority=False):
|
|
|
"""
|
|
|
双端队列 右边弹出任务
|
|
|
|
|
|
:param keys: 键列表,默认为None(将获取所有任务的keys)
|
|
|
:param priority: 优先级
|
|
|
- :return:
|
|
|
+ :return: 键名称(_task_key)和任务(_task)
|
|
|
"""
|
|
|
while True:
|
|
|
+ results = []
|
|
|
# 避免在while循环中修改参数,将keys参数赋值到临时变量
|
|
|
temp_keys = keys
|
|
|
-
|
|
|
- # 不指定keys,将获取所有任务
|
|
|
- if not keys:
|
|
|
- temp_keys = self.redis.keys()
|
|
|
- temp_keys = list(set([re.sub('-\d+$', '', k) for k in temp_keys if re.findall('\w+-\d+$', k)]))
|
|
|
-
|
|
|
- # 根据key作为关键字获取所有的键
|
|
|
all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
|
|
|
|
|
|
- # 屏蔽任务差异性,只按优先级高到低弹出任务
|
|
|
+ # 按优先级高到低弹出任务
|
|
|
if priority:
|
|
|
all_keys = sorted(all_keys, key=lambda x: x.split('-')[-1], reverse=True)
|
|
|
|
|
|
- if all_keys:
|
|
|
- _task_key, _task = self.redis.brpop(all_keys)
|
|
|
- return _task_key.decode(), json.loads(_task.decode())
|
|
|
- time.sleep(2)
|
|
|
+ if len(all_keys) > 0:
|
|
|
+ try:
|
|
|
+ _task_key, _task = self.redis.brpop(all_keys, timeout=1)
|
|
|
+ results.extend([
|
|
|
+ _task_key.decode(),
|
|
|
+ json.loads(_task.decode(), object_hook=lambda d: Task(**d))
|
|
|
+ ])
|
|
|
+ except TypeError:
|
|
|
+ pass
|
|
|
+ return results
|
|
|
|
|
|
|
|
|
# if __name__ == '__main__':
|
|
|
-# mrq = RedisQueue()
|
|
|
-#
|
|
|
-# # 把任务推入redis 队列
|
|
|
-# lst = [i for i in range(0, 40)]
|
|
|
-# print(mrq.push_task('C', lst, level=3))
|
|
|
-#
|
|
|
-# # 从redis queue取出任务
|
|
|
-# while True:
|
|
|
-# redis_key, task = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
|
|
|
-# print(redis_key, task)
|
|
|
-# time.sleep(1)
|
|
|
-#
|
|
|
-# # 查看任务数量以及优先级情况
|
|
|
-# count, key_len = mrq.get_len('task')
|
|
|
-# print(key_len)
|
|
|
+ # mrq = RedisQueue()
|
|
|
+
|
|
|
+ # 把任务推入redis 队列
|
|
|
+ # lst = [Task(name=i, loop_times=i) for i in range(0, 40)]
|
|
|
+ # print(mrq.push_task('C', lst, level=3))
|
|
|
+
|
|
|
+ # 从redis queue取出任务
|
|
|
+ # while True:
|
|
|
+ # item = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
|
|
|
+ # if len(item) > 0:
|
|
|
+ # redis_key, task = item
|
|
|
+ # print(redis_key, task)
|
|
|
+
|
|
|
+ # 查看任务数量以及优先级情况
|
|
|
+ # count, key_len = mrq.get_len('task')
|
|
|
+ # print(key_len)
|
|
|
|