Parcourir la source

优先级队列模块

dongzhaorui il y a 3 ans
Parent
commit
d74c5e494f
1 fichiers modifiés avec 109 ajouts et 0 suppressions
  1. 109 0
      find_source/crawler/task_queue.py

+ 109 - 0
find_source/crawler/task_queue.py

@@ -0,0 +1,109 @@
+import json
+import re
+import time
+from datetime import datetime, date
+from itertools import chain
+
+from crawler.constants import REDIS
+
+
+class ExpandJsonEncoder(json.JSONEncoder):
+    """
+    采用json方式序列化传入的任务参数,而原生的json.dumps()方法不支持datetime、date,这里做了扩展
+    """
+
+    def default(self, obj):
+        if isinstance(obj, datetime):
+            return obj.strftime('%Y-%m-%d %H:%M:%S')
+        elif isinstance(obj, date):
+            return obj.strftime('%Y-%m-%d')
+        else:
+            return json.JSONEncoder.default(self, obj)
+
+
+class RedisQueue:
+
+    def __init__(self):
+        self.redis = REDIS
+
+    def get_len(self, key):
+        keys = self.get_keys(key)
+        # 每个键的任务数量
+        key_len = [(k, self.redis.llen(k)) for k in keys]
+        # 所有键的任务数量
+        task_len = sum(dict(key_len).values())
+        return task_len, key_len
+
+    def get_keys(self, key):
+        # Redis的键支持模式匹配
+        keys = [key.decode() for key in self.redis.keys(key + '-[0-9]*')]
+        # 按优先级将键降序排序
+        keys = sorted(keys, key=lambda x: x.split('-')[-1], reverse=True)
+        return keys
+
+    def push_task(self, key, tasks, level=1):
+        """
+        双端队列,左边推进任务
+        :param key: 任务队列名称
+        :param tasks: 任务
+        :param level: 优先级(int类型),数值越大优先级越高,默认1
+        :return: 任务队列任务数量
+        """
+        # 重新定义优先队列的key
+        new_key = key + '-' + str(level)
+        # 序列化任务参数
+        tasks = [json.dumps(t, cls=ExpandJsonEncoder) for t in tasks]
+        print('RedisQueue info > the number of push tasks:', len(tasks))
+
+        if not tasks:
+            return self.get_len(key)
+
+        self.redis.lpush(new_key, *tasks)
+        return self.get_len(key)
+
+    def pop_task(self, keys=None, priority=False):
+        """
+        双端队列 右边弹出任务
+        :param keys: 键列表,默认为None(将获取所有任务的keys)
+        :param priority: 优先级
+        :return:
+        """
+        while True:
+            # 避免在while循环中修改参数,将keys参数赋值到临时变量
+            temp_keys = keys
+
+            # 不指定keys,将获取所有任务
+            if not keys:
+                temp_keys = self.redis.keys()
+                temp_keys = list(set([re.sub('-\d+$', '', k) for k in temp_keys if re.findall('\w+-\d+$', k)]))
+
+            # 根据key作为关键字获取所有的键
+            all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
+
+            # 屏蔽任务差异性,只按优先级高到低弹出任务
+            if priority:
+                all_keys = sorted(all_keys, key=lambda x: x.split('-')[-1], reverse=True)
+
+            if all_keys:
+                _task_key, _task = self.redis.brpop(all_keys)
+                return _task_key.decode(), json.loads(_task.decode())
+            time.sleep(2)
+
+
+# if __name__ == '__main__':
+#     mrq = RedisQueue()
+#
+#     # 把任务推入redis 队列
+#     lst = [i for i in range(0, 40)]
+#     print(mrq.push_task('C', lst, level=3))
+#
+#     # 从redis queue取出任务
+#     while True:
+#         redis_key, task = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
+#         print(redis_key, task)
+#         time.sleep(1)
+#
+#     # 查看任务数量以及优先级情况
+#     count, key_len = mrq.get_len('task')
+#     print(key_len)
+