rabbitMq.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-09-25
  4. ---------
  5. @summary: rabbitMq消息队列(基于amqpstorm封装)
  6. ---------
  7. @author: Dzr
  8. """
  9. import time
  10. import amqpstorm
  11. from amqpstorm.channel import Channel as AmqpStormChannel
  12. from amqpstorm.connection import Connection as AmqpStormConnection
  13. from amqpstorm.exception import AMQPChannelError, AMQPConnectionError
  14. import feapder.setting as setting
  15. import feapder.utils.tools as tools
  16. from feapder.utils.log import log
  17. class RabbitMQMessage:
  18. def __init__(self, delivery_tag, body):
  19. self.delivery_tag = delivery_tag
  20. self.body = body
  21. def __str__(self):
  22. return f"RabbitMQMessage(delivery_tag={self.delivery_tag}, body={self.body})"
  23. class RabbitMQ:
  24. __RABBITMQ_ATTRS__ = {
  25. 'timeout',
  26. 'virtual_host',
  27. 'heartbeat',
  28. 'ssl',
  29. 'ssl_options',
  30. 'client_properties',
  31. }
  32. def __init__(
  33. self,
  34. user=None,
  35. user_pass=None,
  36. ip_port=None,
  37. url=None,
  38. exchange=None,
  39. exchange_type=None,
  40. durable=True,
  41. **kwargs
  42. ):
  43. """
  44. @param str user: 用户名
  45. @param str user_pass: 密码
  46. @param ip_port: ip:port
  47. @param str url:
  48. @param str exchange: 交换机名称
  49. @param str exchange_type: 交换机类型
  50. RabbitMQ支持以下几种exchange_type类型:
  51. 1. **direct(直连交换机)**:它将消息通过路由键直接发送到与之匹配的队列。使用direct交换机时,消息的路由键需要与绑定到队列上的绑定键完全匹配。
  52. 2. **topic(主题交换机)**:它将消息通过路由键的模式匹配发送到一个或多个队列,这是一种灵活的交换机类型。使用主题交换机时,可以使用通配符进行模糊匹配,例如使用*表示一个单词,#表示零个或多个单词。
  53. 3. **fanout(扇型交换机)**:它将消息广播到所有绑定到它的队列。它忽略了路由键的概念,只需简单地将消息发送给所有队列即可。
  54. 4. **headers(头交换机)**:该交换机根据消息的头部属性进行匹配,而不是路由键。它的匹配规则非常灵活,但在实际应用中使用较少。
  55. @param durable: 是否定义队列或者交换机持久化(服务器重启后,队列是否能够恢复到原来的状态)
  56. @param kwargs: 自定义键值参数
  57. """
  58. if ip_port is None:
  59. ip_port = setting.RABBITMQ_IP_PORT
  60. if user is None:
  61. user = setting.RABBITMQ_USER
  62. if user_pass is None:
  63. user_pass = setting.RABBITMQ_USER_PASS
  64. if exchange is None:
  65. exchange = setting.RABBITMQ_EXCHANGE
  66. if exchange_type is None:
  67. exchange_type = setting.RABBITMQ_EXCHANGE_TYPE
  68. self.__mq = None
  69. self.__channel = None
  70. self._url = url
  71. self._ip_port = ip_port
  72. self._user = user
  73. self._user_pass = user_pass
  74. self._durable = durable
  75. self._exchange = exchange
  76. self._exchange_type = exchange_type
  77. self.mq_kwargs = {
  78. 'virtual_host': setting.RABBITMQ_VIRTUAL_HOST,
  79. 'heartbeat': setting.RABBITMQ_HEARTBEAT,
  80. 'timeout': setting.RABBITMQ_SOCKET_TIMEOUT
  81. }
  82. for key, val in kwargs.copy().items():
  83. if key in self.__RABBITMQ_ATTRS__:
  84. self.mq_kwargs[key] = val
  85. # 创建连接
  86. self.get_connect()
  87. # 创建信道
  88. self.get_channel()
  89. # 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作
  90. self.__cache = {}
  91. self._stop_server = False
  92. @property
  93. def _mq(self) -> AmqpStormConnection:
  94. try:
  95. if not self.__mq.is_open:
  96. raise ConnectionError("unable to connect to RabbitMQ")
  97. except:
  98. if not self._stop_server:
  99. self._reconnect()
  100. return self.__mq
  101. @_mq.setter
  102. def _mq(self, connection: AmqpStormConnection):
  103. self.__mq = connection
  104. def _reconnect(self):
  105. # 检测连接状态,当RabbitMQ重启或者因网络波动导致断开连接时自动重连
  106. retry_count = 0
  107. while True:
  108. try:
  109. retry_count += 1
  110. log.error(f"RabbitMQ 连接断开, 重新连接 {retry_count}")
  111. if self.get_connect():
  112. log.info(f"RabbitMQ 连接成功")
  113. return True
  114. except (ConnectionError,) as e:
  115. log.error(f"连接失败 e: {e}")
  116. time.sleep(1)
  117. def get_connect(self, lazy=False):
  118. try:
  119. if not self._url:
  120. if not self._ip_port:
  121. raise Exception("未设置 RabbitMQ 连接信息")
  122. ip, port = self._ip_port.split(":")
  123. node = {
  124. "hostname": ip,
  125. "port": int(port),
  126. **self.mq_kwargs
  127. }
  128. if self._user and self._user_pass:
  129. node['username'] = self._user
  130. node['password'] = self._user_pass
  131. # 创建连接
  132. self._mq = amqpstorm.Connection(**node, lazy=lazy)
  133. else:
  134. # 创建连接
  135. self._mq = amqpstorm.UriConnection(self._url, lazy=lazy)
  136. except Exception as e:
  137. raise
  138. return self.__mq.is_open
  139. def get_channel(self):
  140. try:
  141. # 建立信道
  142. self._channel = self._mq.channel()
  143. # 声明交换机
  144. self._channel.exchange.declare(
  145. exchange=self._exchange,
  146. exchange_type=self._exchange_type,
  147. durable=self._durable
  148. )
  149. except Exception as e:
  150. raise
  151. return self.__channel.is_open
  152. def _reconnect_channel(self):
  153. retry_count = 0
  154. while True:
  155. try:
  156. retry_count += 1
  157. log.error(f"Channel 连接断开, 重新连接 {retry_count}")
  158. if self.get_channel():
  159. log.info(f"Channel 连接成功")
  160. # 队列重新绑定交换机
  161. for binding_key in self.__cache.copy():
  162. if isinstance(binding_key, tuple):
  163. queue, exchange, routing_key = binding_key
  164. # 清除缓存
  165. del self.__cache[queue]
  166. del self.__cache[binding_key]
  167. # 重新声明
  168. self.declare(queue, exchange, routing_key)
  169. return True
  170. except (ConnectionError,) as e:
  171. log.error(f"连接失败 e: {e}")
  172. time.sleep(1)
  173. @property
  174. def _channel(self) -> AmqpStormChannel:
  175. try:
  176. if not self.__channel.is_open:
  177. raise ConnectionError("unable to connect to Channel")
  178. except:
  179. if not self._stop_server:
  180. self._reconnect_channel()
  181. return self.__channel
  182. @_channel.setter
  183. def _channel(self, channel: AmqpStormChannel):
  184. self.__channel = channel
  185. def add(self, queue, data, exchange=None):
  186. """
  187. 推送数据到rabbitmq消息队列
  188. @param str queue: 队列名称
  189. @param data: 推送数据
  190. @param str exchange: 交换机名称
  191. """
  192. data_lst = data if isinstance(data, list) else [data]
  193. for item in data_lst:
  194. log.debug(f"【{queue}】Mq Push >>> {item}")
  195. data_bytes = tools.dumps_obj(item) # 对象序列化
  196. # RabbitMQ 的 delivery_mode 属性用于设置消息的持久性。它有两种取值:
  197. # delivery_mode=1:表示消息被标记为持久化,但是仍然可能在服务器重启之前丢失。
  198. # delivery_mode=2:表示消息被标记为持久化,并且会存储在磁盘上,确保消息不会丢失
  199. properties = dict(delivery_mode=2)
  200. self._channel.basic.publish(
  201. exchange=exchange or self._exchange,
  202. routing_key=queue,
  203. body=data_bytes,
  204. properties=properties
  205. )
  206. def ack(self, queue, delivery_tag=0, multiple=False):
  207. """
  208. 手动回复队列消息确认
  209. @param str queue: 队列名称
  210. @param int delivery_tag: 消息标签
  211. @param bool multiple: 开启多个回复消息确认
  212. """
  213. self._channel.queue.declare(queue=queue, passive=self._durable)
  214. self._channel.basic.ack(delivery_tag=delivery_tag, multiple=multiple)
  215. def _bind_exchange(self, queue, exchange, routing_key):
  216. """
  217. 将队列绑定到指定的交换机
  218. @param queue: 队列名称
  219. @param exchange: 交换机名称
  220. @param routing_key: 路由键
  221. """
  222. binding_key = (queue, exchange, routing_key)
  223. if binding_key in self.__cache:
  224. return
  225. else:
  226. # 队列绑定一个交换机
  227. self._channel.queue.bind(queue, exchange, routing_key)
  228. self.__cache[binding_key] = True
  229. def _declare_queue(self, queue):
  230. """
  231. 声明持久化rabbitmq队列
  232. @param queue: 队列名称
  233. """
  234. if queue in self.__cache:
  235. return self.__cache[queue]
  236. else:
  237. # 声明队列
  238. result = self._channel.queue.declare(queue, durable=True)
  239. self.__cache[queue] = result
  240. return result
  241. def declare(self, queue, exchange=None, routing_key=None):
  242. """
  243. 声明rabbitmq队列并与一个交换机进行绑定
  244. @param queue: 队列名称
  245. @param exchange: 交换机名称
  246. @param routing_key: 路由键
  247. """
  248. self._declare_queue(queue)
  249. routing_key = routing_key or queue
  250. self._bind_exchange(queue, exchange or self._exchange, routing_key)
  251. def start_consuming(self, limit=None, to_tuple=False, auto_decode=True):
  252. """
  253. @param int limit: 消费数据上限
  254. @param to_tuple:
  255. @param auto_decode:
  256. """
  257. if not self._channel._consumer_callbacks:
  258. raise AMQPChannelError('no consumer callback defined')
  259. count = 0
  260. params = dict(break_on_empty=True, auto_decode=auto_decode)
  261. for message in self._channel.build_inbound_messages(**params):
  262. consumer_tag = message._method.get('consumer_tag')
  263. if to_tuple:
  264. # noinspection PyCallingNonCallable
  265. self._channel._consumer_callbacks[consumer_tag](*message.to_tuple())
  266. continue
  267. # noinspection PyCallingNonCallable
  268. self._channel._consumer_callbacks[consumer_tag](message)
  269. count += 1
  270. if limit is not None and count == limit:
  271. break
  272. def stop_consuming(self, consumer_tag=None):
  273. """
  274. @param str consumer_tag: 消费者标签
  275. """
  276. self._channel.basic.cancel(consumer_tag)
  277. def get(self, queue, limit, no_ack=False):
  278. """
  279. 获取rabbitmq消息队列中的信道数据
  280. @param str queue: 队列名称
  281. @param int limit: 获取消息数量
  282. @param no_ack: 自动回复消息确认
  283. """
  284. message_lst = []
  285. # 设置预取上限数量
  286. self._channel.basic.qos(prefetch_count=limit)
  287. def callback(message):
  288. body = message.body
  289. # 处理接收到的消息
  290. impl = tools.loads_obj(body)
  291. log.debug(f"【{queue}】Mq Received>>> {impl}")
  292. delivery_tag = message.delivery_tag
  293. if no_ack:
  294. self._channel.basic.ack(delivery_tag)
  295. delivery_tag = 0
  296. message_lst.append(str(RabbitMQMessage(delivery_tag, impl)))
  297. try:
  298. # 注册消费者并获取消费者标签
  299. consumer_tag = self._channel.basic.consume(callback, queue=queue)
  300. # 开始消费
  301. self.start_consuming(limit)
  302. # 停止消费并关闭消费者
  303. self.stop_consuming(consumer_tag)
  304. except (AMQPChannelError, AMQPConnectionError) as why:
  305. log.error(f"{why}")
  306. return message_lst
  307. @property
  308. def is_open(self):
  309. return self._mq.is_open
  310. def get_message_count(self, queue):
  311. message = self._channel.queue.declare(queue, passive=True)
  312. return message.get('message_count')
  313. def get_mq_obj(self):
  314. return self._mq
  315. def close(self, n=-1):
  316. log.debug(f'关闭 RabbitMQ {n}')
  317. if self._channel.is_open:
  318. self._channel.close()
  319. if self._mq.is_open:
  320. self.__mq.close()
  321. self._stop_server = True
  322. def __getattr__(self, name):
  323. return getattr(self._mq, name)
  324. def __repr__(self):
  325. if self._url:
  326. return "<RabbitMQ url:{}>".format(self._url)
  327. return "<RabbitMQ ip_port: {} username:{} password:{}>".format(
  328. self._ip_port, self._user, self._user_pass
  329. )
  330. def __enter__(self):
  331. return self
  332. def __exit__(self, exception_type, exception_value, _):
  333. if exception_type:
  334. log.warning(
  335. 'Closing RabbitMQ to an unhandled exception: %s',
  336. exception_value
  337. )
  338. if not self._mq.is_open:
  339. return
  340. self.close()