q.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. import json
  2. from itertools import chain
  3. from crawler.Task import Task
  4. from crawler.constants import REDIS
  5. class RedisQueue:
  6. def __init__(self):
  7. self.redis = REDIS
  8. def get_len(self, key):
  9. keys = self.get_keys(key)
  10. # 每个键的任务数量
  11. key_len = [(k, self.redis.llen(k)) for k in keys]
  12. # 所有键的任务数量
  13. task_len = sum(dict(key_len).values())
  14. return task_len, key_len
  15. def get_keys(self, key):
  16. # Redis的键支持模式匹配
  17. keys = [key.decode() for key in self.redis.keys(key + '-[0-9]*')]
  18. # 按优先级将键降序排序
  19. keys = sorted(keys, key=lambda x: x.split('-')[-1], reverse=True)
  20. return keys
  21. def push_task(self, key, tasks, level=1, name=None):
  22. """
  23. 双端队列,左边推进任务
  24. :param name: 键名称
  25. :param key: 新键名称的前缀
  26. :param tasks: 任务
  27. :param level: 优先级(int类型),数值越大优先级越高,默认1
  28. :return: 任务队列任务数量
  29. """
  30. # 重新定义优先队列的key
  31. if name is None:
  32. new_key = key + '-' + str(level)
  33. else:
  34. new_key = name
  35. # 序列化任务参数
  36. tasks = [json.dumps(t, default=lambda obj: obj.__dict__['data']) for t in tasks]
  37. print('RedisQueue info > the number of push tasks:', len(tasks))
  38. if not tasks:
  39. return self.get_len(key)
  40. self.redis.lpush(new_key, *tasks)
  41. return self.get_len(key)
  42. def pop_task(self, keys, priority=False):
  43. """
  44. 双端队列 右边弹出任务
  45. :param keys: 键列表,默认为None(将获取所有任务的keys)
  46. :param priority: 优先级
  47. :return: 键名称(_task_key)和任务(_task)
  48. """
  49. while True:
  50. results = []
  51. # 避免在while循环中修改参数,将keys参数赋值到临时变量
  52. temp_keys = keys
  53. all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
  54. # 按优先级高到低弹出任务
  55. if priority:
  56. all_keys = sorted(all_keys, key=lambda x: x.split('-')[-1], reverse=True)
  57. if len(all_keys) > 0:
  58. try:
  59. _task_key, _task = self.redis.brpop(all_keys, timeout=1)
  60. results.extend([
  61. _task_key.decode(),
  62. json.loads(_task.decode(), object_hook=lambda d: Task(**d))
  63. ])
  64. except TypeError:
  65. pass
  66. return results
  67. # if __name__ == '__main__':
  68. # mrq = RedisQueue()
  69. # 把任务推入redis 队列
  70. # lst = [Task(name=i, loop_times=i) for i in range(0, 40)]
  71. # print(mrq.push_task('C', lst, level=3))
  72. # 从redis queue取出任务
  73. # while True:
  74. # item = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
  75. # if len(item) > 0:
  76. # redis_key, task = item
  77. # print(redis_key, task)
  78. # 查看任务数量以及优先级情况
  79. # count, key_len = mrq.get_len('task')
  80. # print(key_len)