rabbitMq.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on 2023-09-25
  4. ---------
  5. @summary: rabbitMq消息队列(基于amqpstorm封装)
  6. ---------
  7. @author: Dzr
  8. """
  9. import time
  10. import amqpstorm
  11. from amqpstorm.channel import Channel as AmqpStormChannel
  12. from amqpstorm.connection import Connection as AmqpStormConnection
  13. from amqpstorm.exception import AMQPChannelError, AMQPConnectionError
  14. import feapder.setting as setting
  15. import feapder.utils.tools as tools
  16. from feapder.utils.log import log
  17. class RabbitMQMessage:
  18. def __init__(self, delivery_tag, body):
  19. self.delivery_tag = delivery_tag
  20. self.body = body
  21. def __str__(self):
  22. return f"RabbitMQMessage(delivery_tag={self.delivery_tag}, body={self.body})"
  23. class RabbitMQ:
  24. __RABBITMQ_ATTRS__ = {
  25. 'timeout',
  26. 'virtual_host',
  27. 'heartbeat',
  28. 'ssl',
  29. 'ssl_options',
  30. 'client_properties',
  31. }
  32. __cache = {} # 使用缓存机制来实现仅在需要创建新队列或绑定新关系时进行相关操作
  33. def __init__(
  34. self,
  35. user=None,
  36. user_pass=None,
  37. ip_port=None,
  38. url=None,
  39. exchange=None,
  40. exchange_type=None,
  41. durable=True,
  42. **kwargs
  43. ):
  44. """
  45. @param str user: 用户名
  46. @param str user_pass: 密码
  47. @param ip_port: ip:port
  48. @param str url:
  49. @param str exchange: 交换机名称
  50. @param str exchange_type: 交换机类型
  51. RabbitMQ支持以下几种exchange_type类型:
  52. 1. **direct(直连交换机)**:它将消息通过路由键直接发送到与之匹配的队列。使用direct交换机时,消息的路由键需要与绑定到队列上的绑定键完全匹配。
  53. 2. **topic(主题交换机)**:它将消息通过路由键的模式匹配发送到一个或多个队列,这是一种灵活的交换机类型。使用主题交换机时,可以使用通配符进行模糊匹配,例如使用*表示一个单词,#表示零个或多个单词。
  54. 3. **fanout(扇型交换机)**:它将消息广播到所有绑定到它的队列。它忽略了路由键的概念,只需简单地将消息发送给所有队列即可。
  55. 4. **headers(头交换机)**:该交换机根据消息的头部属性进行匹配,而不是路由键。它的匹配规则非常灵活,但在实际应用中使用较少。
  56. @param durable: 是否定义队列或者交换机持久化(服务器重启后,队列是否能够恢复到原来的状态)
  57. @param kwargs: 自定义键值参数
  58. """
  59. if ip_port is None:
  60. ip_port = setting.RABBITMQ_IP_PORT
  61. if user is None:
  62. user = setting.RABBITMQ_USER
  63. if user_pass is None:
  64. user_pass = setting.RABBITMQ_USER_PASS
  65. if exchange is None:
  66. exchange = setting.RABBITMQ_EXCHANGE
  67. if exchange_type is None:
  68. exchange_type = setting.RABBITMQ_EXCHANGE_TYPE
  69. self.__mq = None
  70. self.__channel = None
  71. self._url = url
  72. self._ip_port = ip_port
  73. self._user = user
  74. self._user_pass = user_pass
  75. self._durable = durable
  76. self._exchange = exchange
  77. self._exchange_type = exchange_type
  78. self._stop_server = False
  79. self.mq_kwargs = {
  80. 'virtual_host': setting.RABBITMQ_VIRTUAL_HOST,
  81. 'heartbeat': setting.RABBITMQ_HEARTBEAT,
  82. 'timeout': setting.RABBITMQ_SOCKET_TIMEOUT
  83. }
  84. for key, val in kwargs.copy().items():
  85. if key in self.__RABBITMQ_ATTRS__:
  86. self.mq_kwargs[key] = val
  87. # 创建连接
  88. self.get_connect()
  89. # 创建信道
  90. self.get_channel()
  91. @property
  92. def _mq(self) -> AmqpStormConnection:
  93. try:
  94. if not self.__mq.is_open:
  95. raise ConnectionError("unable to connect to RabbitMQ")
  96. except:
  97. if not self._stop_server:
  98. self._reconnect()
  99. return self.__mq
  100. @_mq.setter
  101. def _mq(self, connection: AmqpStormConnection):
  102. self.__mq = connection
  103. def _reconnect(self):
  104. # 检测连接状态,当RabbitMQ重启或者因网络波动导致断开连接时自动重连
  105. retry_count = 0
  106. while True:
  107. try:
  108. retry_count += 1
  109. log.error(f"RabbitMQ 连接断开, 重新连接 {retry_count}")
  110. if self.get_connect():
  111. log.info(f"RabbitMQ 连接成功")
  112. return True
  113. except (ConnectionError,) as e:
  114. log.error(f"连接失败 e: {e}")
  115. time.sleep(1)
  116. def get_connect(self, lazy=False):
  117. try:
  118. if not self._url:
  119. if not self._ip_port:
  120. raise Exception("未设置 RabbitMQ 连接信息")
  121. ip, port = self._ip_port.split(":")
  122. node = {
  123. "hostname": ip,
  124. "port": int(port),
  125. **self.mq_kwargs
  126. }
  127. if self._user and self._user_pass:
  128. node['username'] = self._user
  129. node['password'] = self._user_pass
  130. # 创建连接
  131. self._mq = amqpstorm.Connection(**node, lazy=lazy)
  132. else:
  133. # 创建连接
  134. self._mq = amqpstorm.UriConnection(self._url, lazy=lazy)
  135. except Exception as e:
  136. raise
  137. return self.__mq.is_open
  138. def get_channel(self):
  139. try:
  140. # 建立信道
  141. self._channel = self._mq.channel()
  142. # 队列重新绑定交换机
  143. for binding_key in self.__cache.copy():
  144. if isinstance(binding_key, tuple):
  145. queue, exchange, routing_key = binding_key
  146. # 清除缓存
  147. self.__cache.pop(queue, None)
  148. self.__cache.pop(binding_key, None)
  149. # 重新声明绑定
  150. self.declare_bind(queue, exchange, routing_key)
  151. except Exception as e:
  152. raise
  153. return self.__channel.is_open
  154. def _re0channel(self):
  155. retry_count = 0
  156. while True:
  157. try:
  158. retry_count += 1
  159. log.error(f"Channel 连接断开, 重新连接 {retry_count}")
  160. if self.get_channel():
  161. log.info(f"Channel 连接成功")
  162. return True
  163. except (ConnectionError,) as e:
  164. log.error(f"连接失败 e: {e}")
  165. time.sleep(1)
  166. @property
  167. def _channel(self) -> AmqpStormChannel:
  168. try:
  169. if not self.__channel.is_open:
  170. raise ConnectionError("unable to connect to Channel")
  171. except:
  172. if not self._stop_server:
  173. self._re0channel()
  174. return self.__channel
  175. @_channel.setter
  176. def _channel(self, channel: AmqpStormChannel):
  177. self.__channel = channel
  178. def add_batch(self, queue, datas, exchange='', routing_key=''):
  179. """
  180. 批量发布消息
  181. @param str queue: 队列名称
  182. @param datas: 消息内容
  183. @param str exchange: 交换机名称
  184. @param str routing_key: 路由键
  185. """
  186. data_lst = datas if isinstance(datas, list) else [datas]
  187. for data in data_lst:
  188. self.add(data, queue, exchange, routing_key)
  189. def add(self, data, queue='', exchange='', routing_key=''):
  190. """
  191. 发布消息
  192. @param str queue: 队列名称
  193. @param data: 消息内容
  194. @param exchange: 交换机名称
  195. @param routing_key: 路由键
  196. """
  197. if not routing_key and not queue:
  198. raise AttributeError('请设置 routing_key or queue')
  199. # 不指定交换机发送消息,routing_key 表示消息队列名称
  200. # 指定交换机发送消息,routing_key 表示路由键
  201. if not exchange:
  202. routing_key = queue
  203. message_id = tools.get_uuid().replace('-', '')
  204. if isinstance(data, dict):
  205. message_id = data.get("pyuuid") or message_id
  206. # RabbitMQ 的 delivery_mode 属性用于设置消息的持久性。它有两种取值:
  207. # delivery_mode=1:表示消息被标记为持久化,但是仍然可能在服务器重启之前丢失。
  208. # delivery_mode=2:表示消息被标记为持久化,并且会存储在磁盘上,确保消息不会丢失
  209. properties = dict(delivery_mode=2, message_id=message_id)
  210. body = tools.dumps_obj(data) # 对象序列化
  211. self._channel.basic.publish(
  212. body=body,
  213. routing_key=routing_key,
  214. exchange=exchange,
  215. properties=properties
  216. )
  217. def add_dlx(self, exchange, routing_key, data):
  218. """
  219. 发布延时消息
  220. @param data: 消息内容
  221. @param exchange: 交换机名称
  222. @param routing_key: 路由键
  223. """
  224. queue = routing_key
  225. self.add(data, queue, exchange, routing_key)
  226. def ack(self, delivery_tag=0, multiple=False):
  227. """
  228. 手动回复队列消息确认
  229. @param delivery_tag: 消息标签
  230. @param bool multiple: 开启多个回复消息确认
  231. """
  232. self._channel.basic.ack(delivery_tag=delivery_tag, multiple=multiple)
  233. def declare_exchange(self, exchange, exchange_type=None, auto_delete=False,
  234. arguments=None):
  235. """声明交换机"""
  236. shares = dict(
  237. exchange_type=exchange_type or self._exchange_type,
  238. auto_delete=auto_delete,
  239. arguments=arguments
  240. )
  241. try:
  242. # 检查交换机是否存在
  243. params = dict(passive=True, **shares)
  244. return self._channel.exchange.declare(exchange, **params)
  245. except AMQPChannelError as why:
  246. if why.error_code == 404:
  247. self.get_channel()
  248. # 创建一个直连交换机
  249. params = dict(durable=True, **shares)
  250. return self._channel.exchange.declare(exchange, **params)
  251. else:
  252. raise why
  253. def declare_queue(self, queue, auto_delete=False, arguments=None):
  254. """
  255. 声明队列
  256. @param queue:
  257. @param auto_delete:
  258. @param arguments:
  259. """
  260. shares = dict(auto_delete=auto_delete, arguments=arguments)
  261. try:
  262. params = dict(passive=True, **shares)
  263. return self._channel.queue.declare(queue, **params)
  264. except AMQPChannelError as why:
  265. if why.error_code == 404:
  266. self.get_channel()
  267. # 声明持久化队列
  268. params = dict(durable=True, **shares)
  269. return self._channel.queue.declare(queue, **params)
  270. else:
  271. raise why
  272. def declare_bind(self, queue='', exchange='', routing_key=''):
  273. """
  274. 声明队列和交换机,同时将队列绑定交换机
  275. @param queue: 队列名称
  276. @param exchange: 交换机名称
  277. @param routing_key: 路由键
  278. """
  279. exchange = exchange or self._exchange
  280. binding_key = (queue, exchange, routing_key)
  281. if queue in self.__cache and binding_key in self.__cache:
  282. return self.__cache[queue]
  283. self.declare_exchange(exchange)
  284. result = self.declare_queue(queue)
  285. self.__cache[queue] = result
  286. # 队列绑定一个交换机
  287. self._channel.queue.bind(queue, exchange, routing_key)
  288. self.__cache[binding_key] = True
  289. return result
  290. def start_consuming(self, limit=None, to_tuple=False, auto_decode=True):
  291. """
  292. @param int limit: 消费数据上限
  293. @param to_tuple:
  294. @param auto_decode:
  295. """
  296. if not self._channel._consumer_callbacks:
  297. raise AMQPChannelError('no consumer callback defined')
  298. count = 0
  299. params = dict(break_on_empty=True, auto_decode=auto_decode)
  300. for message in self._channel.build_inbound_messages(**params):
  301. consumer_tag = message._method.get('consumer_tag')
  302. if to_tuple:
  303. # noinspection PyCallingNonCallable
  304. self._channel._consumer_callbacks[consumer_tag](*message.to_tuple())
  305. continue
  306. # noinspection PyCallingNonCallable
  307. self._channel._consumer_callbacks[consumer_tag](message)
  308. count += 1
  309. if limit is not None and count == limit:
  310. break
  311. def stop_consuming(self, consumer_tag=None):
  312. """
  313. @param str consumer_tag: 消费者标签
  314. """
  315. self._channel.basic.cancel(consumer_tag)
  316. def get(self, queue, limit, no_ack=False, to_str=True):
  317. """
  318. 获取rabbitmq消息队列中的信道数据
  319. @param str queue: 队列名称
  320. @param int limit: 获取消息数量
  321. @param no_ack: 自动回复消息确认
  322. @param to_str: 消息是否转成字符串
  323. """
  324. message_lst = []
  325. def callback(message):
  326. body = tools.loads_obj(message.body) # 反序列化消息对象
  327. delivery_tag = message.delivery_tag
  328. if no_ack:
  329. self._channel.basic.ack(delivery_tag)
  330. delivery_tag = 0
  331. if to_str:
  332. message_lst.append(str(RabbitMQMessage(delivery_tag, body)))
  333. else:
  334. message_lst.append(RabbitMQMessage(delivery_tag, body))
  335. try:
  336. # 设置预取上限数量
  337. self._channel.basic.qos(prefetch_count=limit)
  338. # 注册消费者并获取消费者标签
  339. consumer_tag = self._channel.basic.consume(callback, queue=queue)
  340. # 开始消费
  341. self.start_consuming(limit)
  342. # 停止消费并关闭消费者
  343. self.stop_consuming(consumer_tag)
  344. except (AMQPChannelError, AMQPConnectionError) as why:
  345. log.error(f"{why}")
  346. return message_lst
  347. def find(self, queue, limit, feature, no_ack=False, to_str=True):
  348. """
  349. 通过检索关键词从rabbitmq消息队列中获取指定数据
  350. @param str queue: 队列名称
  351. @param int limit: 获取消息数量
  352. @param str feature: 数据特征
  353. @param no_ack: 自动回复消息确认
  354. @param to_str: 消息是否转成字符串
  355. """
  356. message_lst = []
  357. # 设置预取上限数量
  358. self._channel.basic.qos(prefetch_count=limit)
  359. def callback(message):
  360. body = message.body
  361. # 处理接收到的消息
  362. impl = tools.loads_obj(body)
  363. delivery_tag = message.delivery_tag
  364. if feature in impl:
  365. if no_ack:
  366. self._channel.basic.ack(delivery_tag)
  367. delivery_tag = 0
  368. if to_str:
  369. message_lst.append(str(RabbitMQMessage(delivery_tag, impl)))
  370. else:
  371. message_lst.append(RabbitMQMessage(delivery_tag, impl))
  372. try:
  373. # 注册消费者并获取消费者标签
  374. consumer_tag = self._channel.basic.consume(callback, queue=queue)
  375. # 开始消费
  376. self.start_consuming(limit)
  377. # 停止消费并关闭消费者
  378. self.stop_consuming(consumer_tag)
  379. except (AMQPChannelError, AMQPConnectionError) as why:
  380. log.error(f"{why}")
  381. return message_lst
  382. @property
  383. def is_open(self):
  384. return self._mq.is_open
  385. def get_message_count(self, queue):
  386. message = self._channel.queue.declare(queue, passive=True)
  387. # message_count 消息统计是消息发布确认之后的数量,未确认消息无法统计
  388. return message.get('message_count')
  389. def get_mq_obj(self):
  390. return self._mq
  391. def close(self, n=-1):
  392. log.debug(f'关闭 RabbitMQ {n}')
  393. if self._channel.is_open:
  394. self._channel.close()
  395. if self._mq.is_open:
  396. self.__mq.close()
  397. self._stop_server = True
  398. def __getattr__(self, name):
  399. return getattr(self._mq, name)
  400. def __repr__(self):
  401. if self._url:
  402. return "<RabbitMQ url:{}>".format(self._url)
  403. return "<RabbitMQ ip_port: {} username:{} password:{}>".format(
  404. self._ip_port, self._user, self._user_pass
  405. )
  406. def __enter__(self):
  407. return self
  408. def __exit__(self, exception_type, exception_value, _):
  409. if exception_type:
  410. log.warning(
  411. 'Closing RabbitMQ to an unhandled exception: %s',
  412. exception_value
  413. )
  414. if not self._mq.is_open:
  415. return
  416. self.close()