request_buffer.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2018-06-19 17:17
  4. ---------
  5. @summary: request 管理器, 负责缓冲添加到数据库中的request
  6. ---------
  7. @author: Boris
  8. @email: boris_liu@foxmail.com
  9. """
  10. import collections
  11. import threading
  12. import feapder.setting as setting
  13. import feapder.utils.tools as tools
  14. from feapder.db.rabbitMq import RabbitMQ
  15. from feapder.dedup import Dedup
  16. from feapder.utils.log import log
  17. MAX_URL_COUNT = 1000 # 缓存中最大request数
  18. class RequestBuffer(threading.Thread):
  19. dedup = None
  20. def __init__(self, redis_key, rabbitmq=None, user=None):
  21. if not hasattr(self, "_requests_deque"):
  22. super(RequestBuffer, self).__init__()
  23. self._thread_stop = False
  24. self._is_adding_to_db = False
  25. self._redis_key = redis_key
  26. self._user = user
  27. self._requests_deque = collections.deque()
  28. self._del_requests_deque = collections.deque()
  29. self._rabbitmq = rabbitmq or RabbitMQ()
  30. # 任务队列
  31. self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
  32. self._rabbitmq.declare_bind(queue=self._tab_requests)
  33. # 失败任务队列
  34. self._tab_failed_requests = setting.TAB_FAILED_REQUESTS
  35. self._rabbitmq.declare_bind(queue=self._tab_failed_requests)
  36. if not self.__class__.dedup and setting.REQUEST_FILTER_ENABLE:
  37. self.__class__.dedup = Dedup(
  38. name=redis_key, to_md5=False, **setting.REQUEST_FILTER_SETTING
  39. ) # 默认过期时间为一个月
  40. def run(self):
  41. self._thread_stop = False
  42. while not self._thread_stop:
  43. try:
  44. self.__add_request_to_db()
  45. except Exception as e:
  46. log.exception(e)
  47. tools.delay_time(1)
  48. def stop(self):
  49. self._thread_stop = True
  50. self._started.clear()
  51. def get_failed_requests_count(self):
  52. return self._rabbitmq.get_message_count(self._tab_failed_requests)
  53. def put_request(self, request):
  54. self._requests_deque.append(request)
  55. if self.get_requests_count() > MAX_URL_COUNT: # 超过最大缓存,主动调用
  56. self.flush()
  57. def put_failed_request(self, request, table=None):
  58. try:
  59. request_dict = request.to_dict
  60. if table is not None:
  61. self._rabbitmq.declare_bind(queue=table) # 声明额外的队列
  62. queue = table or self._tab_failed_requests
  63. # 设置访问者的唯一标识
  64. properties = dict(correlation_id=self._user or self._redis_key)
  65. self._rabbitmq.add(request_dict, queue=queue, properties=properties)
  66. except Exception as e:
  67. log.exception(e)
  68. def flush(self):
  69. try:
  70. self.__add_request_to_db()
  71. except Exception as e:
  72. log.exception(e)
  73. def get_requests_count(self):
  74. return len(self._requests_deque)
  75. def is_adding_to_db(self):
  76. return self._is_adding_to_db
  77. def __add_request_to_db(self):
  78. kw = {"properties": dict(correlation_id=self._user) if self._user else None}
  79. request_list = []
  80. prioritys = []
  81. callbacks = []
  82. while self._requests_deque:
  83. request = self._requests_deque.popleft()
  84. self._is_adding_to_db = True
  85. if callable(request):
  86. # 函数
  87. # 注意:应该考虑闭包情况。闭包情况可写成
  88. # def test(xxx = xxx):
  89. # # TODO 业务逻辑 使用 xxx
  90. # 这么写不会导致xxx为循环结束后的最后一个值
  91. callbacks.append(request)
  92. continue
  93. priority = request.priority
  94. # 如果需要去重并且库中已重复,则continue
  95. if (
  96. request.filter_repeat
  97. and setting.REQUEST_FILTER_ENABLE
  98. and not self.__class__.dedup.add(request.fingerprint)
  99. ):
  100. log.debug("request已存在 url = %s" % request.url)
  101. continue
  102. else:
  103. request_list.append(str(request.to_dict))
  104. prioritys.append(priority)
  105. # 入库(超过上限[MAX_URL_COUNT]执行)
  106. if len(request_list) > MAX_URL_COUNT:
  107. self._rabbitmq.add_batch(self._tab_requests, request_list, **kw)
  108. request_list = []
  109. prioritys = []
  110. # 入库(小于上限[MAX_URL_COUNT]执行)
  111. if request_list:
  112. self._rabbitmq.add_batch(self._tab_requests, request_list, **kw)
  113. # 执行回调
  114. for callback in callbacks:
  115. try:
  116. callback()
  117. except Exception as e:
  118. log.exception(e)
  119. self._is_adding_to_db = False