3
0
Эх сурвалжийг харах

删除rabbitmq 提交消息时强制检查队列状态逻辑

dongzhaorui 1 жил өмнө
parent
commit
85054c3439

+ 1 - 1
FworkSpider/feapder/core/collector.py

@@ -103,7 +103,7 @@ class Collector(threading.Thread):
 
             if request_dict:
                 self._todo_requests.put(request_dict)
-                self._rabbitmq.ack(self._tab_requests, delivery_tag)
+                self._rabbitmq.ack(delivery_tag)
 
     def get_request(self):
         try:

+ 2 - 2
FworkSpider/feapder/core/handle_failed_items.py

@@ -17,7 +17,7 @@ from feapder.db.rabbitMq import RabbitMQ, RabbitMQMessage
 from feapder.network.item import Item, UpdateItem
 from feapder.utils.log import log
 
-# 执行 eval 需要的变量属性
+# 执行 eval 需要的全局作用域
 tools.load_globals(RabbitMQMessage, ObjectId)
 
 
@@ -78,7 +78,7 @@ class HandleFailedItems:
 
                     # 入库成功后删除
                     def delete_item():
-                        self._rabbitmq.ack(self._tab_failed_items, delivery_tag)
+                        self._rabbitmq.ack(delivery_tag)
 
                     self._item_buffer.put_item(delete_item)
                     self._item_buffer.flush()

+ 7 - 2
FworkSpider/feapder/core/handle_failed_requests.py

@@ -14,7 +14,7 @@ from feapder.db.rabbitMq import RabbitMQ, RabbitMQMessage
 from feapder.network.request import Request
 from feapder.utils.log import log
 
-# 执行 eval 需要的变量属性
+# 执行 eval 需要的全局作用域
 tools.load_globals(RabbitMQMessage)
 
 
@@ -52,7 +52,12 @@ class HandleFailedRequests(object):
                     request["retry_times"] = 0
                     request_obj = Request.from_dict(request)
                     self._request_buffer.put_request(request_obj)
-                    self._rabbitmq.ack(self._tab_failed_requests, delivery_tag)
+
+                    # 入库成功后删除
+                    def delete_request():
+                        self._rabbitmq.ack(delivery_tag)
+
+                    self._request_buffer.put_request(delete_request)
                     total_count += 1
 
             except Exception as e: