|
@@ -29,6 +29,7 @@ class RequestBuffer(threading.Thread):
|
|
|
|
|
|
self._thread_stop = False
|
|
|
self._is_adding_to_db = False
|
|
|
+ self._redis_key = redis_key
|
|
|
|
|
|
self._requests_deque = collections.deque()
|
|
|
self._del_requests_deque = collections.deque()
|
|
@@ -38,7 +39,7 @@ class RequestBuffer(threading.Thread):
|
|
|
self._tab_requests = setting.TAB_REQUESTS.format(redis_key=redis_key)
|
|
|
self._rabbitmq.declare(queue=self._tab_requests)
|
|
|
# 失败任务队列
|
|
|
- self._tab_failed_requests = setting.TAB_FAILED_REQUESTS.format(redis_key=redis_key)
|
|
|
+ self._tab_failed_requests = setting.TAB_FAILED_REQUESTS
|
|
|
self._rabbitmq.declare(queue=self._tab_failed_requests)
|
|
|
|
|
|
if not self.__class__.dedup and setting.REQUEST_FILTER_ENABLE:
|
|
@@ -76,6 +77,9 @@ class RequestBuffer(threading.Thread):
|
|
|
# 声明额外的队列
|
|
|
self._rabbitmq.declare(queue=table)
|
|
|
|
|
|
+ # 添加任务标识
|
|
|
+ request_dict[self._redis_key] = ''
|
|
|
+
|
|
|
self._rabbitmq.add(
|
|
|
table or self._tab_failed_requests,
|
|
|
request_dict
|