rabbitMq.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-09-25
  4. ---------
  5. @summary: rabbitMq消息队列(基于amqpstorm封装)
  6. ---------
  7. @author: Dzr
  8. """
  9. import time
  10. import amqpstorm
  11. from amqpstorm.channel import Channel as AmqpStormChannel
  12. from amqpstorm.connection import Connection as AmqpStormConnection
  13. from amqpstorm.exception import AMQPChannelError, AMQPConnectionError
  14. import feapder.setting as setting
  15. import feapder.utils.tools as tools
  16. from feapder.utils.log import log
  17. class RabbitMQMessage:
  18. def __init__(self, delivery_tag, body):
  19. self.delivery_tag = delivery_tag
  20. self.body = body
  21. def __str__(self):
  22. return f"RabbitMQMessage(delivery_tag={self.delivery_tag}, body={self.body})"
  23. class RabbitMQ:
  24. __RABBITMQ_ATTRS__ = {
  25. "timeout",
  26. "virtual_host",
  27. "heartbeat",
  28. "ssl",
  29. "ssl_options",
  30. "client_properties",
  31. }
  32. __cache = {} # 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作
  33. def __init__(
  34. self,
  35. user=None,
  36. user_pass=None,
  37. ip_port=None,
  38. url=None,
  39. exchange=None,
  40. exchange_type=None,
  41. durable=True,
  42. **kwargs
  43. ):
  44. """
  45. @param str user: 用户名
  46. @param str user_pass: 密码
  47. @param ip_port: ip:port
  48. @param str url:
  49. @param str exchange: 交换机名称
  50. @param str exchange_type: 交换机类型
  51. RabbitMQ支持以下几种exchange_type类型:
  52. 1. **direct(直连交换机)**:它将消息通过路由键直接发送到与之匹配的队列。使用direct交换机时,消息的路由键需要与绑定到队列上的绑定键完全匹配。
  53. 2. **topic(主题交换机)**:它将消息通过路由键的模式匹配发送到一个或多个队列,这是一种灵活的交换机类型。使用主题交换机时,可以使用通配符进行模糊匹配,例如使用*表示一个单词,#表示零个或多个单词。
  54. 3. **fanout(扇型交换机)**:它将消息广播到所有绑定到它的队列。它忽略了路由键的概念,只需简单地将消息发送给所有队列即可。
  55. 4. **headers(头交换机)**:该交换机根据消息的头部属性进行匹配,而不是路由键。它的匹配规则非常灵活,但在实际应用中使用较少。
  56. @param durable: 是否定义队列或者交换机持久化(服务器重启后,队列是否能够恢复到原来的状态)
  57. @param kwargs: 自定义键值参数
  58. """
  59. if ip_port is None:
  60. ip_port = setting.RABBITMQ_IP_PORT
  61. if user is None:
  62. user = setting.RABBITMQ_USER
  63. if user_pass is None:
  64. user_pass = setting.RABBITMQ_USER_PASS
  65. if exchange is None:
  66. exchange = setting.RABBITMQ_EXCHANGE
  67. if exchange_type is None:
  68. exchange_type = setting.RABBITMQ_EXCHANGE_TYPE
  69. self.__mq = None
  70. self.__channel = None
  71. self._url = url
  72. self._ip_port = ip_port
  73. self._user = user
  74. self._user_pass = user_pass
  75. self._durable = durable
  76. self._exchange = exchange
  77. self._exchange_type = exchange_type
  78. self._stop_server = False
  79. self.mq_kwargs = {
  80. "virtual_host": setting.RABBITMQ_VIRTUAL_HOST,
  81. "heartbeat": setting.RABBITMQ_HEARTBEAT,
  82. "timeout": setting.RABBITMQ_SOCKET_TIMEOUT
  83. }
  84. for key, val in kwargs.copy().items():
  85. if key in self.__RABBITMQ_ATTRS__:
  86. self.mq_kwargs[key] = val
  87. # 创建连接
  88. self.get_connect()
  89. # 创建信道
  90. self.get_channel()
  91. @property
  92. def _mq(self) -> AmqpStormConnection:
  93. try:
  94. if not self.__mq.is_open:
  95. raise ConnectionError("unable to connect to RabbitMQ")
  96. except:
  97. if not self._stop_server:
  98. self._reconnect()
  99. return self.__mq
  100. @_mq.setter
  101. def _mq(self, connection: AmqpStormConnection):
  102. self.__mq = connection
  103. def _reconnect(self):
  104. # 检测连接状态,当RabbitMQ重启或者因网络波动导致断开连接时自动重连
  105. retry_count = 0
  106. while True:
  107. try:
  108. retry_count += 1
  109. log.error(f"RabbitMQ 连接断开, 重新连接 {retry_count}")
  110. if self.get_connect():
  111. log.info(f"RabbitMQ 连接成功")
  112. return True
  113. except (ConnectionError,) as e:
  114. log.error(f"连接失败 e: {e}")
  115. time.sleep(1)
  116. def get_connect(self, lazy=False):
  117. try:
  118. if not self._url:
  119. if not self._ip_port:
  120. raise Exception("未设置 RabbitMQ 连接信息")
  121. ip, port = self._ip_port.split(":")
  122. node = {
  123. "hostname": ip,
  124. "port": int(port),
  125. **self.mq_kwargs
  126. }
  127. if self._user and self._user_pass:
  128. node["username"] = self._user
  129. node["password"] = self._user_pass
  130. # 创建连接
  131. self._mq = amqpstorm.Connection(**node, lazy=lazy)
  132. else:
  133. # 创建连接
  134. self._mq = amqpstorm.UriConnection(self._url, lazy=lazy)
  135. except Exception as e:
  136. raise
  137. return self.__mq.is_open
  138. def get_channel(self):
  139. try:
  140. # 建立信道
  141. self._channel = self._mq.channel()
  142. # 队列重新绑定交换机
  143. for binding_key in self.__cache.copy():
  144. if isinstance(binding_key, tuple):
  145. queue, exchange, routing_key = binding_key
  146. # 清除缓存
  147. self.__cache.pop(queue, None)
  148. self.__cache.pop(binding_key, None)
  149. # 重新声明绑定
  150. self.declare_bind(queue, exchange, routing_key)
  151. except Exception as e:
  152. raise
  153. return self.__channel.is_open
  154. def _re0channel(self):
  155. retry_count = 0
  156. while True:
  157. try:
  158. retry_count += 1
  159. log.error(f"Channel 连接断开, 重新连接 {retry_count}")
  160. if self.get_channel():
  161. log.info(f"Channel 连接成功")
  162. return True
  163. except (ConnectionError,) as e:
  164. log.error(f"连接失败 e: {e}")
  165. time.sleep(1)
  166. @property
  167. def _channel(self) -> AmqpStormChannel:
  168. try:
  169. if not self.__channel.is_open:
  170. raise ConnectionError("unable to connect to Channel")
  171. except:
  172. if not self._stop_server:
  173. self._re0channel()
  174. return self.__channel
  175. @_channel.setter
  176. def _channel(self, channel: AmqpStormChannel):
  177. self.__channel = channel
  178. def add_batch(self, queue, datas, exchange="", routing_key="", **kwargs):
  179. """
  180. 批量发布消息
  181. @param str queue: 队列名称
  182. @param datas: 消息内容
  183. @param exchange: 交换机名称
  184. @param routing_key: 路由键
  185. """
  186. data_lst = datas if isinstance(datas, list) else [datas]
  187. for data in data_lst:
  188. self.add(data, queue, exchange, routing_key, **kwargs)
  189. def add(self, data, queue="", exchange="", routing_key="", properties=None):
  190. """
  191. 发布消息
  192. @param str queue: 队列名称
  193. @param data: 消息内容
  194. @param exchange: 交换机名称
  195. @param routing_key: 路由键
  196. @param properties: 消息属性
  197. """
  198. if not routing_key and not queue:
  199. raise AttributeError("请设置 routing_key or queue")
  200. # 不指定交换机发送消息,routing_key 表示消息队列名称
  201. # 指定交换机发送消息,routing_key 表示路由键
  202. if not exchange:
  203. routing_key = queue
  204. message_id = tools.get_uuid().replace("-", "")
  205. if isinstance(data, dict):
  206. message_id = data.get("pyuuid") or message_id
  207. # RabbitMQ 的 delivery_mode 属性用于设置消息的持久性。它有两种取值:
  208. # delivery_mode=1:表示消息被标记为持久化,但是仍然可能在服务器重启之前丢失。
  209. # delivery_mode=2:表示消息被标记为持久化,并且会存储在磁盘上,确保消息不会丢失
  210. properties = properties or {}
  211. properties = dict(delivery_mode=2, **properties)
  212. if "message_id" not in properties:
  213. properties["message_id"] = message_id
  214. self._channel.basic.publish(
  215. body=tools.dumps_obj(data), # 对象序列化
  216. routing_key=routing_key,
  217. exchange=exchange,
  218. properties=properties # specification.Basic.Properties
  219. )
  220. def add_dlx(self, exchange, routing_key, data, properties=None):
  221. """
  222. 发布延时消息
  223. @param exchange: 交换机名称
  224. @param routing_key: 路由键
  225. @param data: 消息内容
  226. @param properties: 消息属性
  227. """
  228. queue = routing_key
  229. self.add(data, queue, exchange, routing_key, properties)
  230. def ack(self, delivery_tag=0, multiple=False):
  231. """
  232. 手动回复队列消息确认
  233. @param delivery_tag: 消息标签
  234. @param bool multiple: 开启多个回复消息确认
  235. """
  236. self._channel.basic.ack(delivery_tag=delivery_tag, multiple=multiple)
  237. def declare_exchange(self, exchange, exchange_type=None, auto_delete=False, arguments=None):
  238. """声明交换机"""
  239. shares = dict(
  240. exchange_type=exchange_type or self._exchange_type,
  241. auto_delete=auto_delete,
  242. arguments=arguments
  243. )
  244. try:
  245. # 检查交换机是否存在
  246. params = dict(passive=True, **shares)
  247. return self._channel.exchange.declare(exchange, **params)
  248. except AMQPChannelError as why:
  249. if why.error_code == 404:
  250. self.get_channel()
  251. # 创建一个直连交换机
  252. params = dict(durable=True, **shares)
  253. return self._channel.exchange.declare(exchange, **params)
  254. else:
  255. raise why
  256. def declare_queue(self, queue, auto_delete=False, arguments=None):
  257. """
  258. 声明队列
  259. @param queue:
  260. @param auto_delete:
  261. @param arguments:
  262. """
  263. shares = dict(auto_delete=auto_delete, arguments=arguments)
  264. try:
  265. params = dict(passive=True, **shares)
  266. return self._channel.queue.declare(queue, **params)
  267. except AMQPChannelError as why:
  268. if why.error_code == 404:
  269. self.get_channel()
  270. # 声明持久化队列
  271. params = dict(durable=True, **shares)
  272. return self._channel.queue.declare(queue, **params)
  273. else:
  274. raise why
  275. def declare_bind(self, queue="", exchange="", routing_key=""):
  276. """
  277. 声明队列和交换机,同时将队列绑定交换机
  278. @param queue: 队列名称
  279. @param exchange: 交换机名称
  280. @param routing_key: 路由键
  281. """
  282. exchange = exchange or self._exchange
  283. binding_key = (queue, exchange, routing_key)
  284. if queue in self.__cache and binding_key in self.__cache:
  285. return self.__cache[queue]
  286. self.declare_exchange(exchange)
  287. result = self.declare_queue(queue)
  288. self.__cache[queue] = result
  289. # 队列绑定一个交换机
  290. self._channel.queue.bind(queue, exchange, routing_key)
  291. self.__cache[binding_key] = True
  292. return result
  293. def start_consuming(
  294. self,
  295. limit=None,
  296. to_tuple=False,
  297. auto_decode=True,
  298. correlation_id="",
  299. use_filter=False
  300. ):
  301. """
  302. @param int limit: 消费数据上限
  303. @param to_tuple: 消息结果返回元组形式
  304. @param auto_decode: 自动解码
  305. @param correlation_id: 应用程序关联标识符
  306. @param use_filter: 使用缓存过滤重复数据
  307. """
  308. if not self._channel._consumer_callbacks:
  309. raise AMQPChannelError("no consumer callback defined")
  310. cache = [] # 消息队列缓存,去除重复消息
  311. recv_count = 0 # 接收消息计数
  312. params = dict(break_on_empty=True, auto_decode=auto_decode)
  313. for message in self._channel.build_inbound_messages(**params):
  314. consumer_tag = message._method.get("consumer_tag")
  315. if use_filter and message.message_id in cache:
  316. continue
  317. cache.append(message.message_id) # 缓存消息id
  318. # 指定应用程序标识,只会在特定的程序中被消费
  319. if correlation_id:
  320. if correlation_id == message.correlation_id:
  321. self._channel._consumer_callbacks[consumer_tag](message)
  322. recv_count += 1
  323. if limit is not None and recv_count == limit:
  324. break
  325. else:
  326. continue
  327. recv_count += 1
  328. if to_tuple:
  329. self._channel._consumer_callbacks[consumer_tag](*message.to_tuple())
  330. if limit is not None and recv_count == limit:
  331. break
  332. else:
  333. continue
  334. self._channel._consumer_callbacks[consumer_tag](message)
  335. if limit is not None and recv_count == limit:
  336. break
  337. def stop_consuming(self, consumer_tag=None):
  338. """
  339. @param str consumer_tag: 消费者标签
  340. """
  341. self._channel.basic.cancel(consumer_tag)
  342. def get(self, queue, limit, auto_ack=False, to_str=True, **kwargs):
  343. """
  344. 获取rabbitmq消息队列中的信道数据
  345. @param str queue: 队列名称
  346. @param int limit: 获取消息数量
  347. @param auto_ack: 自动回复消息确认
  348. @param to_str: 消息是否转成字符串
  349. @param to_str: 消息是否转成字符串
  350. """
  351. message_lst = []
  352. if "use_filter" not in kwargs:
  353. kwargs["use_filter"] = True
  354. def callback(message):
  355. body = tools.loads_obj(message.body) # 反序列化消息对象
  356. delivery_tag = message.delivery_tag
  357. if auto_ack:
  358. self.ack(delivery_tag)
  359. delivery_tag = 0
  360. if to_str:
  361. message_lst.append(str(RabbitMQMessage(delivery_tag, body)))
  362. else:
  363. message_lst.append(RabbitMQMessage(delivery_tag, body))
  364. try:
  365. # 设置预取上限数量
  366. self._channel.basic.qos(prefetch_count=limit)
  367. # 注册消费者并获取消费者标签
  368. consumer_tag = self._channel.basic.consume(callback, queue=queue)
  369. # 开始消费
  370. self.start_consuming(limit, **kwargs)
  371. # 停止消费并关闭消费者
  372. self.stop_consuming(consumer_tag)
  373. except (AMQPChannelError, AMQPConnectionError) as why:
  374. log.exception(why)
  375. return message_lst
  376. @property
  377. def is_open(self):
  378. return self._mq.is_open
  379. def __get_message_count(self, correlation_id, **kwargs):
  380. channel = self._mq.channel() # 启用检查数量的临时信道
  381. recv_count = 0 # 接收消息计数
  382. try:
  383. channel.basic.consume(queue=kwargs["queue"]) # 指定查询的队列
  384. for message in channel.build_inbound_messages(break_on_empty=True):
  385. # 指定应用程序标识,只会在特定的程序中被消费
  386. if correlation_id == message.correlation_id:
  387. recv_count += 1
  388. except amqpstorm.exception.AMQPChannelError as why:
  389. log.exception(why)
  390. finally:
  391. channel.close() # 关闭查询的临时信道
  392. result = kwargs.copy()
  393. result["message_count"] = recv_count # 重置消息数量
  394. return result
  395. def get_message_count(self, queue, user=None):
  396. try:
  397. message = self._channel.queue.declare(queue, passive=True)
  398. if user is not None:
  399. message = self.__get_message_count(user, **message)
  400. # message_count 消息统计是消息发布确认之后的数量,未确认消息无法统计
  401. return message.get("message_count")
  402. except amqpstorm.exception.AMQPChannelError:
  403. return 0
  404. def get_mq_obj(self):
  405. return self._mq
  406. def close(self, n=-1):
  407. log.debug(f"关闭 RabbitMQ {n}")
  408. if self._channel.is_open:
  409. self._channel.close()
  410. if self._mq.is_open:
  411. self.__mq.close()
  412. self._stop_server = True
  413. def __getattr__(self, name):
  414. return getattr(self._mq, name)
  415. def __repr__(self):
  416. if self._url:
  417. return "<RabbitMQ url:{}>".format(self._url)
  418. return "<RabbitMQ ip_port: {} username:{} password:{}>".format(
  419. self._ip_port, self._user, self._user_pass
  420. )
  421. def __enter__(self):
  422. return self
  423. def __exit__(self, exception_type, exception_value, _):
  424. if exception_type:
  425. log.warning(
  426. "Closing RabbitMQ to an unhandled exception: %s",
  427. exception_value
  428. )
  429. if not self._mq.is_open:
  430. return
  431. self.close()