Forráskód Böngészése

修复mq阻塞问题

dongzhaorui 1 éve
szülő
commit
b4d7d230d7
1 módosított fájl, 13 hozzáadás és 13 törlés
  1. 13 13
      FworkSpider/feapder/core/spiders/spider.py

+ 13 - 13
FworkSpider/feapder/core/spiders/spider.py

@@ -11,11 +11,12 @@ Created on 2020/4/22 12:05 AM
 import warnings
 from collections import Iterable
 
+import amqpstorm
+
 import feapder.setting as setting
 import feapder.utils.tools as tools
 from feapder.core.base_parser import BaseParser
 from feapder.core.scheduler import Scheduler
-from feapder.db.rabbitMq import RabbitMQ
 from feapder.network.item import Item, FailedTaskItem
 from feapder.network.request import Request
 from feapder.utils.log import log
@@ -316,21 +317,20 @@ class BaseBusinessDetailSpider(Spider):
             self._rabbitmq.add(produce_task, produce_queue, properties=properties)
 
             # 步骤2 等待任务生产完成的处理回应消息
-            receipt_queue = "pyspider.report.receipt"
-            while 1:
-                with self._rabbitmq.get_mq_obj().channel() as channel:
-                    # 直接获取全部消息,若设置预取数量可能出现获取自身correlation_id的情况出现,导致死循环
-                    channel.basic.consume(queue=receipt_queue)
+            receipt_queue = f"receipt_{correlation_id}"
+            with self._rabbitmq.get_mq_obj().channel() as channel:
+                try:
+                    channel.basic.consume(queue=receipt_queue, no_ack=True)
+                    tools.delay_time(0.8)  # 监听与收复消息的时间间隔
                     inbound = channel.build_inbound_messages(break_on_empty=True)
                     message_dict = {msg.correlation_id: msg for msg in inbound}
+                    log.debug(f"采集任务推送 {message_dict}")
                     message = message_dict.get(correlation_id)
-                    if not message:
-                        log.debug(f"{correlation_id} 采集任务推送中...")
-                        tools.delay_time(1)
-                        continue
-
-                    message.ack()
-                    break
+                    if message:
+                        body = tools.loads_obj(message.body)
+                        log.debug(f"推送任务到采集队列《{body['queue_name']}》完成")
+                except amqpstorm.exception.AMQPChannelError:
+                    pass
 
         # 步骤3 开始拉取任务
         task_lst = []