task_queue.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import json
  2. import re
  3. import time
  4. from datetime import datetime, date
  5. from itertools import chain
  6. from crawler.constants import REDIS
  7. class ExpandJsonEncoder(json.JSONEncoder):
  8. """
  9. 采用json方式序列化传入的任务参数,而原生的json.dumps()方法不支持datetime、date,这里做了扩展
  10. """
  11. def default(self, obj):
  12. if isinstance(obj, datetime):
  13. return obj.strftime('%Y-%m-%d %H:%M:%S')
  14. elif isinstance(obj, date):
  15. return obj.strftime('%Y-%m-%d')
  16. else:
  17. return json.JSONEncoder.default(self, obj)
  18. class RedisQueue:
  19. def __init__(self):
  20. self.redis = REDIS
  21. def get_len(self, key):
  22. keys = self.get_keys(key)
  23. # 每个键的任务数量
  24. key_len = [(k, self.redis.llen(k)) for k in keys]
  25. # 所有键的任务数量
  26. task_len = sum(dict(key_len).values())
  27. return task_len, key_len
  28. def get_keys(self, key):
  29. # Redis的键支持模式匹配
  30. keys = [key.decode() for key in self.redis.keys(key + '-[0-9]*')]
  31. # 按优先级将键降序排序
  32. keys = sorted(keys, key=lambda x: x.split('-')[-1], reverse=True)
  33. return keys
  34. def push_task(self, key, tasks, level=1):
  35. """
  36. 双端队列,左边推进任务
  37. :param key: 任务队列名称
  38. :param tasks: 任务
  39. :param level: 优先级(int类型),数值越大优先级越高,默认1
  40. :return: 任务队列任务数量
  41. """
  42. # 重新定义优先队列的key
  43. new_key = key + '-' + str(level)
  44. # 序列化任务参数
  45. tasks = [json.dumps(t, cls=ExpandJsonEncoder) for t in tasks]
  46. print('RedisQueue info > the number of push tasks:', len(tasks))
  47. if not tasks:
  48. return self.get_len(key)
  49. self.redis.lpush(new_key, *tasks)
  50. return self.get_len(key)
  51. def pop_task(self, keys=None, priority=False):
  52. """
  53. 双端队列 右边弹出任务
  54. :param keys: 键列表,默认为None(将获取所有任务的keys)
  55. :param priority: 优先级
  56. :return:
  57. """
  58. while True:
  59. # 避免在while循环中修改参数,将keys参数赋值到临时变量
  60. temp_keys = keys
  61. # 不指定keys,将获取所有任务
  62. if not keys:
  63. temp_keys = self.redis.keys()
  64. temp_keys = list(set([re.sub('-\d+$', '', k) for k in temp_keys if re.findall('\w+-\d+$', k)]))
  65. # 根据key作为关键字获取所有的键
  66. all_keys = list(chain(*[self.get_keys(k) for k in temp_keys]))
  67. # 屏蔽任务差异性,只按优先级高到低弹出任务
  68. if priority:
  69. all_keys = sorted(all_keys, key=lambda x: x.split('-')[-1], reverse=True)
  70. if all_keys:
  71. _task_key, _task = self.redis.brpop(all_keys)
  72. return _task_key.decode(), json.loads(_task.decode())
  73. time.sleep(2)
  74. # if __name__ == '__main__':
  75. # mrq = RedisQueue()
  76. #
  77. # # 把任务推入redis 队列
  78. # lst = [i for i in range(0, 40)]
  79. # print(mrq.push_task('C', lst, level=3))
  80. #
  81. # # 从redis queue取出任务
  82. # while True:
  83. # redis_key, task = mrq.pop_task(keys=['A', 'B', 'C', 'D', 'E'], priority=True)
  84. # print(redis_key, task)
  85. # time.sleep(1)
  86. #
  87. # # 查看任务数量以及优先级情况
  88. # count, key_len = mrq.get_len('task')
  89. # print(key_len)