request_buffer.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2018-06-19 17:17
  4. ---------
  5. @summary: request 管理器, 负责缓冲添加到数据库中的request
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import collections
  11. import threading
  12. import feapder.setting as setting
  13. import feapder.utils.tools as tools
  14. from feapder.db.redisdb import RedisDB
  15. from feapder.dedup import Dedup
  16. from feapder.utils.log import log
  17. MAX_URL_COUNT = 1000 # 缓存中最大request数
  18. class RequestBuffer(threading.Thread):
  19. dedup = None
  20. def __init__(self, redis_key):
  21. if not hasattr(self, "_requests_deque"):
  22. super(RequestBuffer, self).__init__()
  23. self._thread_stop = False
  24. self._is_adding_to_db = False
  25. self._requests_deque = collections.deque()
  26. self._del_requests_deque = collections.deque()
  27. self._db = RedisDB()
  28. self._table_request = setting.TAB_REQUSETS.format(redis_key=redis_key)
  29. self._table_failed_request = setting.TAB_FAILED_REQUSETS.format(
  30. redis_key=redis_key
  31. )
  32. if not self.__class__.dedup and setting.REQUEST_FILTER_ENABLE:
  33. self.__class__.dedup = Dedup(
  34. name=redis_key, to_md5=False, **setting.REQUEST_FILTER_SETTING
  35. ) # 默认过期时间为一个月
  36. def run(self): # step 1 线程入口
  37. self._thread_stop = False
  38. while not self._thread_stop: # 每隔一分钟进行一次 将产生的任务存储
  39. try:
  40. self.__add_request_to_db()
  41. except Exception as e:
  42. log.exception(e)
  43. tools.delay_time(1)
  44. def stop(self):
  45. self._thread_stop = True
  46. self._started.clear()
  47. def put_request(self, request):
  48. self._requests_deque.append(request)
  49. if self.get_requests_count() > MAX_URL_COUNT: # 超过最大缓存,主动调用
  50. self.flush()
  51. def put_del_request(self, request):
  52. self._del_requests_deque.append(request)
  53. def put_failed_request(self, request, table=None):
  54. try:
  55. request_dict = request.to_dict
  56. self._db.zadd(
  57. table or self._table_failed_request, request_dict, request.priority
  58. )
  59. except Exception as e:
  60. log.exception(e)
  61. def flush(self):
  62. try:
  63. self.__add_request_to_db()
  64. except Exception as e:
  65. log.exception(e)
  66. def get_requests_count(self):
  67. return len(self._requests_deque)
  68. def is_adding_to_db(self):
  69. return self._is_adding_to_db
  70. def __add_request_to_db(self):
  71. request_list = []
  72. prioritys = []
  73. callbacks = []
  74. while self._requests_deque:
  75. request = self._requests_deque.popleft() # 从任务队列中从左取任务(先进先出)
  76. self._is_adding_to_db = True
  77. if callable(request):
  78. # 函数
  79. # 注意:应该考虑闭包情况。闭包情况可写成
  80. # def test(xxx = xxx):
  81. # # TODO 业务逻辑 使用 xxx
  82. # 这么写不会导致xxx为循环结束后的最后一个值
  83. callbacks.append(request)
  84. continue
  85. priority = request.priority
  86. # 如果需要去重并且库中已重复 则continue
  87. if (
  88. request.filter_repeat
  89. and setting.REQUEST_FILTER_ENABLE
  90. and not self.__class__.dedup.add(request.fingerprint)
  91. ):
  92. log.debug("request已存在 url = %s" % request.url)
  93. continue
  94. else:
  95. request_list.append(str(request.to_dict))
  96. prioritys.append(priority)
  97. if len(request_list) > MAX_URL_COUNT:
  98. self._db.zadd(self._table_request, request_list, prioritys)
  99. request_list = []
  100. prioritys = []
  101. # 入库
  102. if request_list:
  103. self._db.zadd(self._table_request, request_list, prioritys)
  104. # 执行回调
  105. for callback in callbacks:
  106. try:
  107. callback()
  108. except Exception as e:
  109. log.exception(e)
  110. # 删除已做任务
  111. if self._del_requests_deque:
  112. request_done_list = []
  113. while self._del_requests_deque:
  114. request_done_list.append(self._del_requests_deque.popleft())
  115. # 去掉request_list中的requests, 否则可能会将刚添加的request删除
  116. request_done_list = list(set(request_done_list) - set(request_list))
  117. if request_done_list:
  118. self._db.zrem(self._table_request, request_done_list)
  119. self._is_adding_to_db = False