rabbitMQ+应用架构设计
本文最后更新于 2025-03-24,文章超过7天没更新,应该是已完结了~
1. MQ 的相关概念
1.1 什么是 MQ
MQ(message queue),从字面意思上看,本质是个队列,FIFO 先入先出,只不过队列中存放的内容是message 而已,还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ 是一种非常常
见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了 MQ 之后,消息发送上游只需要依赖 MQ,不
用依赖其他服务。
1.2 为什么要用 MQ
1.2.1 流量消峰
举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。
1.2.2 应用解耦
以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。
1.2.3 异步处理
有些服务间调用是异步的,例如 A 调用 B,B 需要花费很长时间执行,但是 A 需要知道 B 什么时候可以执行完,以前一般有两种方式,A 过一段时间去调用 B 的查询 api 查询。或者 A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A 调用 B 服务后,只需要监听 B 处理完成的消息,当 B 处理完成后,会发送一条消息给 MQ,MQ 会将此消息转发给 A 服务。这样 A 服务既不用循环调用 B 的查询 api,也不用提供 callback api。同样 B 服务也不用做这些操作。A 服务还能及时的得到异步处理成功的消息。
1.2.4 广播
如果没有消息队列,每当一个新的业务方接入,我们都要联调一次新接口。有了消息队列,我们只需要关心消息是否送达了队列,至于谁希望订阅,是下游的事情,无疑极大地减少了开发和联调的工作量。
1.2.5 最终一致性
最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。当然有个时间限制,理论上越快越好,但实际上在各种异常的情况下,可能会有一定延迟达到最终一致状态,但最后两个系统的状态是一样的。
1.3 MQ 的分类
1.3.1 ActiveMQ
优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据。
缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
1.3.2 Kafka
大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。
优点:性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用。
缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序,社区更新较慢。
1.3.3 RocketMQ
RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积**,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ。
缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码。
1.3.4 RabbitMQ
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。
优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高。
缺点:商业版需要收费,学习成本较高。
2. RabbitMQ
2.1 RabbitMQ 的概念
RabbitMQ 是一个消息中间件:它接受并转发消息。你可以把它当做一个快递站点,当你要发送一个包时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是 一个快递站,一个快递员帮你传递快件。RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收, 存储和转发消息数据。
(AMQP 0-9-1 模型)
2.2 四大核心概念
生产者
产生数据发送消息的程序是生产者。
交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定 。
队列
队列是RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存 储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可 以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式 。
消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费 者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
2.3 各个名词介绍
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker。
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
Connection:publisher/consumer 和 broker 之间的 TCP 连接。
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
Exchange:message 到达 broker 的第一站,根据分发规则(binding),匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) 。
Queue:消息最终被送到这里等待 consumer 取走。
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保
存到 exchange 中的查询表中,用于 message 的分发依据。
2.4 交换机和交换机类型
在 RabbitMQ 中,消息不会直接发送到队列(Queue),而是发送到交换机(Exchange)。交换机根据路由规则决定将消息投递到哪些队列中。
路由规则
路由键(Routing Key):生产者发送消息时附带的属性,用于描述消息的路由规则(可以为空)。
绑定(Binding):队列与交换机之间的映射关系,定义了哪些消息可以投递到该队列。绑定基于路由键进行匹配。
消息本身和 Binding(绑定)确实都有 Routing Key,但它们的作用和意义是不同的。可以理解为:
消息的 Routing Key:生产者发送消息时附带的属性,描述消息的路由路径。
Binding 的 Routing Key:队列绑定到交换机时定义的规则,决定消息是否匹配并被路由到队列。
当消息发送到 RabbitMQ 服务器(Broker)时:
消息附带的路由键会与交换机的绑定规则进行匹配。
如果匹配成功,消息将被投递到对应的队列;否则可能被丢弃(视交换机类型而定)。
交换机的工作方式
交换机是 RabbitMQ 中的核心组件之一,负责接收生产者的消息并按照绑定规则将其路由到队列。交换机的行为由以下两部分决定:
交换机类型:决定消息的路由算法。
绑定规则(Bindings):定义队列与交换机之间的关系。
在声明交换机时,可以设置多种属性,以满足不同的业务需求。其中关键的几个属性包括:
Name(名称):交换机的唯一标识。
Durability(持久性):决定交换机在消息代理(Broker)重启后是否依然存在。
Auto-delete(自动删除):当所有绑定到该交换机的队列都解绑后,是否自动删除该交换机。
Arguments(参数):为交换机设置额外的属性,这些参数通常依赖于具体的代理实现。
交换机的状态
交换机可以有两种状态:
持久(Durable):持久化的交换机在消息代理重启后依旧存在,无需重新声明。
暂存(Transient):暂存的交换机不会在代理重启后保留,需要在代理重新启动时重新声明。
持久化的必要性
并非所有场景都需要持久化的交换机:
需要持久化的场景:系统需要在代理重启后保留交换机,以确保消息流的连续性。
不需要持久化的场景:临时任务或短生命周期的功能中,暂存交换机可能更适用,以避免资源占用。
2.4.1 Direct Exchange
将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。
2.4.2 Topic Exchange
将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。
Topic 类型交换机的 Routing Key 规则
发送到 Topic 类型交换机的消息,其 routing_key
必须遵循以下规则:
结构要求:Routing Key 必须是一个单词列表,单词之间用点号(
.
)分隔,例如:
stock.usd.nyse
nyse.vmw
quick.orange.rabbit
长度限制:整个 Routing Key 最多不能超过 255 个字节。
通配符规则
在 Topic 交换机中,绑定时可以使用以下两个通配符定义规则:
*
(星号):
代表匹配 一个单词。
示例:
Routing Key:
quick.orange.rabbit
Binding Key:
quick.*.rabbit
(匹配成功)Binding Key:
quick.*.*
(匹配成功)Binding Key:
*.orange.rabbit
(匹配成功)
#
(井号):
代表匹配 零个或多个单词。
示例:
Routing Key:
quick.orange.rabbit
Binding Key:
quick.#
(匹配成功)Binding Key:
#
(匹配成功)Binding Key:
#.rabbit
(匹配成功)
注意
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了
2.4.3 Fanout Exchange
直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key。
2.4.4 Headers Exchange
Headers Exchange 根据消息中的 headers 属性,与绑定(Binding)中定义的参数进行匹配,决定是否将消息路由到绑定的队列中。
工作原理
消息携带的 headers 是一个键值对集合,例如:
{
"key1": "value1",
"key2": "value2"
}
Binding 也定义了需要匹配的键值对规则,称为 arguments,例如:
{
"key1": "value1",
"x-match": "all"
}
x-match
是一个特殊参数,用于定义匹配策略:
all
:消息的所有 headers 都必须匹配 Binding 的参数。any
:消息的任意一个 header 匹配 Binding 的参数即可。
匹配过程:
交换机检查消息的 headers 是否满足绑定的参数规则(包括
x-match
的策略)。如果匹配成功,消息被投递到对应的队列。
优点
Headers Exchange 基于键值对匹配,支持更灵活的条件配置。
不依赖 Routing Key,而是直接基于消息的元数据(headers)。
应用场景
需要高度定制化的路由规则时,Headers Exchange 是理想选择。
常用于携带丰富元数据的消息分发场景,例如分布式系统中复杂的消息处理逻辑。
2.4.5 六大模式
1. 点对点模式(Point-to-Point)
概念:消息从一个生产者发送到一个消费者。
特点:
消息只能被一个消费者接收和处理。
如果没有消费者在线,消息会存储在队列中,直到有消费者接收。
应用场景:任务分发(如订单处理、任务队列)。
实现方式:使用队列(Queue)。
2. 发布/订阅模式(Publish/Subscribe)
概念:消息从一个生产者发送到多个消费者(订阅者)。
特点:
所有订阅者都能收到相同的消息。
消息通过主题(Topic)进行分发。
应用场景:事件通知(如日志广播、股票行情推送)。
实现方式:使用主题(Topic)或广播机制。
这里是多个多列,区别工作模式是一个队列
3. 请求/响应模式(Request/Response)
概念:生产者发送一个请求消息,消费者处理后返回一个响应消息。
特点:
双向通信。
通常需要消息的唯一标识(如
correlation_id
)来匹配请求和响应。
应用场景:远程过程调用(RPC)、服务通信。
实现方式:结合队列或消息路由机制实现。
4. 工作队列模式(Work Queue)
概念:生产者发送消息到队列,多个消费者从队列中获取任务,任务分发采用负载均衡策略。
特点:
多个消费者同时工作,任务被分配给空闲的消费者。
适合高并发任务处理。
应用场景:任务调度、后台作业处理。
实现方式:队列配合消费者。
5. 路由模式(Routing)
概念:生产者根据路由键将消息发送到特定的队列。
特点:
消息携带 Routing Key,交换机根据绑定的规则分发消息。
绑定规则可以实现消息的精准投递。
应用场景:根据业务类型分发消息(如订单类型区分:支付订单、退款订单)。
实现方式:Direct 类型交换机。
6. 主题模式(Topic)
概念:生产者将消息发送到交换机,通过路由键的模式匹配,将消息分发到多个队列。
特点:
使用通配符(如
*
和#
)实现模糊匹配。适合复杂的消息分类和分发。
应用场景:多级分类的消息分发(如按地区、服务分类的通知系统)。
实现方式:Topic 类型交换机。
2.6 消息分发
消息队列系统中,RabbitMQ 的消息分发策略主要有三种:轮询分发、不公平分发和限流。每种分发方式适用于不同的业务场景。
2.6.1 轮询分发(Round Robin Dispatching)
默认策略: RabbitMQ 默认采用轮询分发的方式。当队列有多个消费者时,消息会按照顺序轮流分发给每个消费者,确保每个消费者接收到的消息数量是均衡的。
特点:
简单高效,适用于所有消费者处理能力相当的场景。
如果消费者处理能力不均,可能会导致部分消费者空闲,而另一些消费者过载。
2.6.2 不公平分发(Unfair Dispatching)
问题背景: 如果消费者的处理速度不同,例如:
消费者 1 处理速度很快。
消费者 2 处理速度较慢。 默认的轮询分发会导致处理速度快的消费者处于较多的空闲状态,而处理慢的消费者则可能一直积压任务。
解决方案: 使用 消息应答机制(ACK) 和 预取值设置 来避免任务的不均衡分配。
设置方式:
代码实现:
channel.basicQos(1);
含义:消费者声明自己在处理完当前消息并发送 ACK 之前,不会再接收新的消息。RabbitMQ 会优先将任务分配给空闲的消费者。--->一次只能处理一条消息
效果:
消费者可以根据自己的处理能力动态接收任务。
避免了处理能力不均带来的分配问题。
注意事项: 如果所有消费者都繁忙且队列中的任务持续增加,可能会导致队列被填满。这种情况下可以通过以下方式优化:
增加新的消费者(Worker)。
重新设计任务的分发策略或存储方式。
2.6.3 限流(Flow Control)
定义: 限流通过设置 预取计数(Prefetch Count) 限制通道上允许的未确认消息的最大数量,确保消费者不会因为消息堆积而超载。
设置方法:
使用
basic.qos
方法设置预取值。示例代码:
channel.basicQos(prefetchCount);
参数说明:
prefetchCount
:定义通道中未确认消息的最大数量。
预取值推荐:
保守模式:设置
prefetchCount = 1
,消费者处理完一条消息后才能接收下一条消息。性能模式:推荐将预取值设置在
100~300
之间,既能提高吞吐量,又能降低消费者超载风险。
适用场景:
高并发任务处理。
消费者需要稳定的负载保护。
总结
1. basicQos
不管系统中攒了多少未确认的消息
basicQos(n)
的核心作用是控制 单个消费者 同时可以接收的未确认消息的数量。即使队列里还有大量未处理的消息,
basicQos
只关注消费者本身的负载:
“这个消费者同时最多可以接收 n 条未确认的消息。”只要消费者的未确认消息数量还没有达到
n
,RabbitMQ 就会继续给它分配消息。对于整个系统或队列中的消息积压数量,
basicQos
并不关心。
举个例子:
假设一个队列里有 100 条消息,而我们有 2 个消费者,设置 basicQos(2)
:
每个消费者最多只会接收 2 条消息,直到它们确认这些消息后,才会分配新消息。
即使队列中还有 96 条未分发的消息,
basicQos
也不会强制分发更多消息给消费者。
2. 限流会管系统中的消息积压和流量
限流 的目的是从生产者和队列层面限制消息的流入速度或者处理速度。
它不仅关注消费者的负载,还关注整个系统是否有能力处理当前的消息流量,防止消息队列中积压过多的消息,导致性能下降或系统崩溃。
举个例子:
假设一个队列可以承受每秒最多处理 100 条消息,如果生产者的发送速率是每秒 500 条消息:
如果没有限流,消息队列会迅速积压,直到系统资源耗尽。
使用限流,可以让生产者以每秒最多 100 条消息的速度发送,避免队列被填满。
2.7 消息确认
在 RabbitMQ 中,消息确认机制主要用于确保消息可靠地投递。无论是从生产者到交换机,还是从交换机到队列,都可以通过不同方式进行消息确认,避免消息丢失。
2.7.1 发布确认
原理
当生产者将信道(Channel)设置为 confirm 模式 时,所有通过该信道发布的消息都会分配一个唯一的 ID(从 1 开始)。
当消息成功投递到所有匹配的队列后,Broker 会向生产者发送一个确认消息(包含消息的唯一 ID)。
如果消息设置了持久化属性,确认消息会在写入磁盘后返回。
通过
delivery-tag
字段标识确认的消息序列号,并通过basic.ack
的multiple
参数表示是否批量确认。
开启发布确认
发布确认默认关闭,需要显式调用以下方法开启:
// 创建 Channel 实例
channel = connection.createChannel();
// 开启发布确认
channel.confirmSelect();
单个确认发布
特点:生产者发布消息后等待确认再发布下一个消息。
实现方式:调用
waitForConfirmsOrDie()
方法。
如果消息未在指定时间内确认,则抛出异常。
缺点是性能较低,因为消息的发布是阻塞的,适合小规模低频场景。
示例代码:
// 单条消息确认
channel.basicPublish("", queueName, null, message.getBytes());
// 等待确认,默认阻塞直到消息被确认
channel.waitForConfirmsOrDie();
// 或设置超时时间
channel.waitForConfirmsOrDie(1000L);
批量确认发布
特点:生产者一次性发布多条消息,然后批量确认。
优点:相比单条确认,批量确认极大提高了吞吐量。
缺点:无法精确定位出哪条消息导致失败,需要在内存中维护批次记录。
示例代码:
int batchSize = 100; // 批量大小
int outstandingMessageCount = 0;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息 " + i;
channel.basicPublish("", queueName, null, message.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
// 批量确认
channel.waitForConfirms();
outstandingMessageCount = 0;
}
}
// 处理剩余未确认的消息
if (outstandingMessageCount > 0) {
channel.waitForConfirms();
}
异步确认发布
特点:通过回调函数异步处理消息确认,适合高并发场景。
实现方式:将消息及其序列号存入线程安全的队列(如
ConcurrentSkipListMap
),通过回调处理确认或未确认的消息。
示例代码:
// 开启发布确认
channel.confirmSelect();
// 使用线程安全的跳表存储未确认的消息
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
// 添加异步确认监听器
channel.addConfirmListener((deliveryTag, multiple) -> {
if (multiple) {
outstandingConfirms.headMap(deliveryTag, true).clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
}, (deliveryTag, multiple) -> {
String failedMessage = outstandingConfirms.get(deliveryTag);
System.err.println("未确认消息:" + failedMessage + ",序列号:" + deliveryTag);
});
// 发布消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息 " + i;
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
channel.basicPublish("", queueName, null, message.getBytes());
}
回退消息
在生产者确认模式下,如果消息无法被路由到队列,生产者默认不会收到任何通知,导致消息可能被丢弃。
通过设置
mandatory
参数,可以确保消息在无法路由时返回给生产者。
实现方式:
// 设置 mandatory 参数为 true
channel.basicPublish(exchange, routingKey, true, null, message.getBytes());
// 添加回退监听器
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
System.err.println("消息回退:" + new String(body) + ",原因:" + replyText);
});
2.7.2 消息应答
自动应答
工作机制
消息发送后,RabbitMQ 默认认为消息已被消费成功,并立即从队列中删除消息。
消费者无需显式发送确认信号 (ACK),RabbitMQ 不等待消费者的处理完成。
优点
高吞吐量:省略了显式的消息确认过程,提升了消息的处理效率。
简单实现:消费者代码无需额外的确认逻辑。
缺点
消息丢失风险:
如果消费者在接收到消息后未处理完成,因崩溃或连接丢失,消息将无法被重新分发,从而丢失。过载风险:
RabbitMQ 不对消费者的处理能力进行限制,可能导致消费者因堆积过多未处理的消息而资源耗尽。
适用场景
消费者处理速度快、可靠性要求低的场景,例如日志数据的临时分析。
手动应答
工作机制
消费者需要显式地通过 ACK 或 NACK 来告知 RabbitMQ 消息的处理状态。
RabbitMQ 会根据消费者的反馈采取不同的操作。
消息应答的类型
ACK (肯定确认)
使用
channel.basicAck(deliveryTag, multiple)
。告诉 RabbitMQ:消息已被成功处理,可以将其从队列中移除。
channel.basicAck(deliveryTag, false); // 单个确认
channel.basicAck(deliveryTag, true); // 批量确认
NACK (否定确认)
使用
channel.basicNack(deliveryTag, multiple, requeue)
。告诉 RabbitMQ:消息处理失败,可选择是否重新入队。
参数解释:
multiple
:是否批量操作。requeue
:true
表示重新入队,false
表示丢弃。
channel.basicNack(deliveryTag, false, true); // 单条消息重新入队
Reject (拒绝消息)
使用
channel.basicReject(deliveryTag, requeue)
。与
NACK
类似,但不支持批量操作。
channel.basicReject(deliveryTag, true); // 单条消息重新入队
2.8 持久化概述
持久化的目标是保障即使 RabbitMQ 服务重启或崩溃,队列和消息依然可以恢复,避免消息丢失。
需要队列和消息都设置为持久化,才能实现较高的可靠性。
2.8.1 队列持久化
默认行为
默认创建的队列是非持久化的。
当 RabbitMQ 重启后,非持久化队列会被删除。
实现队列持久化
创建队列时将
durable
参数设置为true
。示例代码:
String queueName = "durableQueue";
boolean queueDurable = true; // 持久化队列
boolean exclusive = false; // 非排他队列 如果队列被声明为 exclusive = true,那么该队列将只会在创建它的连接(或通道)中可用,且当该连接关闭时,队列会自动删除
boolean autoDelete = false; // 非自动删除
channel.queueDeclare(queueName, queueDurable, exclusive, autoDelete, null);
注意事项
如果已经存在一个同名的非持久化队列,无法直接将其改为持久化队列。需要先删除原队列,再重新创建。
channel.queueDelete(queueName); // 删除原队列
channel.queueDeclare(queueName, true, false, false, null); // 创建 持久化队列
队列持久化仅保证队列的元数据(如队列名称和属性)能够在 RabbitMQ 重启后恢复。
2.8.2 消息持久化
默认行为
默认消息是非持久化的,仅存储在内存中,RabbitMQ 重启后会丢失。
实现消息持久化
在生产者发布消息时,设置消息的
deliveryMode
属性为 2(持久化)。示例代码:
BasicProperties.Builder propsBuilder = new BasicProperties.Builder();
propsBuilder.deliveryMode(2); // 2 表示消息持久化
AMQP.BasicProperties properties = propsBuilder.build();
String exchangeName = "exchange";
String routingKey = "key";
byte[] messageBody = "Persistent Message".getBytes();
channel.basicPublish(exchangeName, routingKey, properties, messageBody);
注意事项
deliveryMode 参数:
1:非持久化消息。
2:持久化消息。
持久化的消息会存储到磁盘中,但这并不意味着绝对不会丢失。因为在以下情况下,消息仍可能丢失:
消息尚未完全写入磁盘时,RabbitMQ 服务崩溃。
RabbitMQ 的文件存储机制存在缓存间隔点。
消息持久化会对性能造成一定影响,因为写入磁盘比存储在内存中更耗时。
发布确认(强持久化策略)
如果需要更高的消息可靠性,可以结合 发布确认模式(Publisher Confirms)。
RabbitMQ 在消息成功写入磁盘后,会向生产者返回确认,确保消息已被持久化。
实现发布确认
开启发布确认模式:
channel.confirmSelect(); // 启用发布确认
发布消息并等待确认:
String exchange = "exchange";
String routingKey = "key";
byte[] message = "Persistent Message with Confirm".getBytes();
channel.basicPublish(exchange, routingKey, null, message);
if (!channel.waitForConfirms()) {
System.out.println("Message delivery failed!");
} else {
System.out.println("Message confirmed!");
}
优点
确保消息真正存储到磁盘后再继续处理后续逻辑。
避免消息因 RabbitMQ 崩溃而丢失。
缺点
需要额外的网络通信和等待确认,降低了消息吞吐量。
2.9 队列
补充一个知识:
channel.queueDeclare(normalQueue, false, false, false, params);
中的 第五个参数 是 arguments
,它是一个 Map<String, Object> 类型的参数,允许你设置队列的额外属性和参数。
在 RabbitMQ 中,队列可以通过这些参数来配置一些特殊的行为。例如:
设置死信交换机(
x-dead-letter-exchange
)设置死信路由键(
x-dead-letter-routing-key
)设置队列的最大长度(
x-max-length
)设置消息的最大TTL(
x-message-ttl
)设置队列的优先级(
x-max-priority
)
这些参数通常用于定制队列的行为,以便在消息处理和存储方面满足特定需求。
例如:
设置死信交换机和死信路由键:
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 死信交换机
params.put("x-dead-letter-routing-key", "dead_routingkey"); // 死信路由键
channel.queueDeclare(normalQueue, false, false, false, params);设置队列的最大长度:
Map<String, Object> params = new HashMap<>();
params.put("x-max-length", 1000); // 设置最大长度为 1000
channel.queueDeclare(normalQueue, false, false, false, params);设置消息过期时间(TTL):
Map<String, Object> params = new HashMap<>();
params.put("x-message-ttl", 60000); // 设置消息过期时间为 60,000 毫秒(1分钟)
channel.queueDeclare(normalQueue, false, false, false, params);
2.9.1 死信队列
死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
死信来源
消息 TTL 过期
队列达到最大长度(队列满了,无法再添加数据到 mq 中)
消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false
设置死信队列
// 声明正常交换机,类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信交换机,类型为 direct
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
String deadQueue = "dead-queue"; // 死信队列名称
channel.queueDeclare(deadQueue, false, false, false, null); // 声明死信队列,非持久化,不自动删除
// 将死信队列与死信交换机绑定,设置死信路由键为 "dead_routingkey"
channel.queueBind(deadQueue, DEAD_EXCHANGE, "dead_routingkey");
// 设置正常队列的参数,将死信交换机和路由键绑定到正常队列
Map<String, Object> params = new HashMap<>();
// 设置死信交换机
params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信路由键
params.put("x-dead-letter-routing-key", "dead_routingkey");
// 声明正常队列,绑定死信交换机和路由键
String normalQueue = "normal-queue"; // 正常队列名称
channel.queueDeclare(normalQueue, false, false, false, params); // 声明正常队列
channel.queueBind(normalQueue, NORMAL_EXCHANGE, "normal_routingkey"); // 绑定正常队列
2.9.2 延迟队列
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。
实现方式
基于消息TTL和队列TTL转换入死信队列,消费死信队列中的消息
基于延迟插件,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
用 Java 的 DelayQueue
利用 Redis 的 zset
// 声明一个普通队列
String normalQueue = "normal-queue";
// 设置该队列的死信交换机和路由键,将消息在超时后转到死信队列
Map<String, Object> params = new HashMap<>();
params.put("x-dead-letter-exchange", "dead-exchange"); // 设置死信交换机
params.put("x-dead-letter-routing-key", "dead-routing-key"); // 设置死信路由键
// 设置队列消息的过期时间(TTL),单位是毫秒
params.put("x-message-ttl", 10000); // 设置消息的TTL为10秒(10000毫秒)
// 声明队列,并设置死信队列信息和TTL
channel.queueDeclare(normalQueue, true, false, false, params);
// 将正常队列绑定到交换机
channel.queueBind(normalQueue, "normal-exchange", "normal-routing-key");
2.9.3 优先队列
要让队列实现优先级需要做的事情有如下事情:队列需要设置为优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序。
// 设置队列的最大优先级为 10
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10); // 队列的最大优先级为 10
// 声明优先队列
String priorityQueue = "priority-queue";
channel.queueDeclare(priorityQueue, true, false, false, args);
// 发送消息时设置优先级
BasicProperties.Builder propsBuilder = new BasicProperties.Builder();
propsBuilder.priority(5); // 设置消息的优先级为 5
// 发送消息到交换机
channel.basicPublish("priority-exchange", "priority-routing-key", propsBuilder.build(), "优先级消息内容".getBytes("UTF-8"));
2.9.4 惰性队列
RRabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。
// 声明惰性队列
Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy"); // 设置队列模式为惰性队列
// 创建惰性队列,存储在磁盘而不是内存中
String lazyQueue = "lazy-queue";
channel.queueDeclare(lazyQueue, true, false, false, args);
// 绑定惰性队列到交换机
channel.queueBind(lazyQueue, "lazy-exchange", "lazy-routing-key");
内存开销对比
在发送 1 百万条消息,每条消息大概占 1KB 的情况下,普通队列占用内存是 1.2GB,而惰性队列仅仅占用 1.5MB
2.10 TTL
TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为”死信”。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用。
2.10.1 队列TTL
Map args = MapUtils.toMap(new Object[][]{{"x-message-ttl", 2000}});
channel.queueDeclare(queue_name, queue_durable, false, false, args);
2.10.2 消息TTL
BasicProperties.Builder basicProperties = new BasicProperties.Builder();
basicProperties.expiration("1000");
2.11 幂等性
在分布式系统中,尤其是消息队列(MQ)应用中,幂等性是确保消息处理在重复消费时不会产生不一致或副作用的关键机制。幂等性保证了即使消息被多次消费,也不会导致不一致的状态。
2.11.1 消息重复消费
在消息队列的消费过程中,消费者收到消息并进行处理,但如果在消费者处理消息时发生网络中断或其他问题,导致消费者未能确认消息(ACK),那么这条消息可能会被重新投递给消费者。重新投递的消息如果已经被消费者成功处理,就会导致消息的重复消费。
问题:
消息重复消费会导致数据的不一致、业务逻辑的错误或副作用(例如:重复支付、重复发送邮件等)。
2.11.2 解决思路
为了确保消息的幂等性,即使发生重复消费,依然能够保持系统的正确性,可以采用以下两种常见的方式:
全局唯一 ID: 每条消息都有一个唯一的标识符(如全局 ID、时间戳、UUID 等)。消费者在处理消息前,可以先检查这个唯一标识符是否已处理过,避免重复消费。
如果消息的 ID 已经存在,说明该消息已经被处理过,可以跳过当前消息。
如果消息的 ID 不存在,说明是第一次处理该消息,可以进行正常的处理操作。
指纹码机制: 指纹码是一种基于业务规则生成的唯一标识符,通常是通过将时间戳与业务相关数据拼接生成的。消费者可以用指纹码作为消息的唯一标识,并通过查询数据库或缓存来判断是否已经处理过该消息。
优点:实现简单,适用于大部分业务场景。
缺点:在高并发场景下,单一数据库可能成为瓶颈,需要考虑分库分表等技术来提升性能。
2.11.3 幂等性保障
有几种方式可以帮助实现幂等性保障:
唯一 ID + 指纹码机制: 利用系统生成的唯一标识符(如 UUID)或时间戳加上业务相关信息(如用户 ID、订单号等)生成指纹码。消费者在处理消息前,通过查询数据库或缓存来判断该消息是否已被处理过。
优点:实现简单,逻辑清晰,适用于大多数应用场景。
缺点:在高并发的情况下,如果所有的判断都依赖数据库,可能会对性能产生瓶颈。可以通过分库分表等方式来优化性能。
示例:
// 消费者处理消息时,先通过唯一标识符检查消息是否已经处理过
String uniqueMessageId = message.getUniqueId();
if (messageProcessed(uniqueMessageId)) {
// 消息已处理,跳过
return;
}
// 处理消息
processMessage(message);
// 标记消息已处理
markMessageProcessed(uniqueMessageId);
Redis 原子性: Redis 的
SETNX
(SET if Not Exists)命令本身具有幂等性,可以确保某个键值在不存在的情况下被设置。这使得 Redis 成为保障幂等性的一种高效方式。
消费者可以通过
SETNX
命令设置一个唯一的标识符(如消息的 ID)。如果该标识符已经存在,说明消息已经被处理过,可以跳过当前消息。优点:Redis 是内存数据库,读写速度非常快,能处理高并发的请求。
缺点:依赖 Redis,需要保证 Redis 的高可用性和性能。
示例:
// Redis setnx 实现幂等性
String uniqueMessageId = message.getUniqueId();
// 如果该消息 ID 不存在,则设置并处理消息
if (redis.setnx(uniqueMessageId, "processed") == 1) {
// 消息首次处理,执行处理逻辑
processMessage(message);
} else {
// 消息已处理,跳过
return;
}
总结:
幂等性是保证消息队列系统中不会因为消息重复消费而导致数据不一致的关键机制。常见的解决方法包括:
使用 唯一标识符 或 指纹码 来判断消息是否已处理。
利用 Redis 的原子性操作(如
SETNX
)来确保消息的处理具有幂等性。
通过这些措施,可以有效避免消息的重复消费,保证系统的正确性和一致性。
3. 架构设计
rabbitMQ客户端
@Slf4j
@Service
public class RabbitClient {
@Resource
private RabbitTemplate rabbitTemplate;
@Autowired(required = false)
private FailMsgDao failMsgDao;
//这里 RabbitClient 注入自己(自我注入),是为了方便在方法内部调用时触发 Spring 的代理机制(比如重试功能)--》虽然没用到
@Resource
private RabbitClient rabbitClient;
//提供一个简化的接口
public void sendMsg(String exchange, String routingKey, Object msg) {
rabbitClient.sendMsg(exchange, routingKey, msg, null, null, false);
}
/**
* @记在方法上,表示如果方法抛出指定异常,会自动重试
* value = MqException.class:当抛出 MqException 异常时触发重试。
* maxAttempts = 3:最多retry 3 次(加上第一次,总共尝试 4 次)。
* backoff = @Backoff(value = 3000, multiplier = 1.5):重试间隔时间,第一次等 3 秒(3000 毫秒),第二次等 4.5 秒(3 × 1.5)...
* recover = "saveFailMag":如果重试都失败了,调用 saveFailMag 方法处理
*/
@Retryable(value = MqException.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5), recover = "saveFailMag")
public void sendMsg(String exchange, String routingKey, Object msg, Integer delay, Long msgId, boolean isFailMsg) {
// 1.发送消息前准备
// 1.1获取消息内容,如果非字符串将其序列化
String jsonMsg = JsonUtils.toJsonStr(msg);
// 1.2.全局唯一消息id,如果调用者设置了消息id,使用调用者消息id,如果为配置,默认雪花算法生成消息id
msgId = NumberUtils.null2Default(msgId, IdUtil.getSnowflakeNextId());
// 1.3.设置默认延迟时间,默认立即发送
delay = NumberUtils.null2Default(delay, -1);
log.debug("消息发送!exchange = {}, routingKey = {}, msg = {}, msgId = {}", exchange, routingKey, jsonMsg, msgId);
// 用 CorrelationData(RabbitMQ 的工具)绑定消息 ID 和回调,监听消息发送到exchange成功或失败
// 构建回调需要实现ListenableFutureCallback接口
RabbitMqListenableFutureCallback futureCallback = RabbitMqListenableFutureCallback.builder()
.exchange(exchange)
.routingKey(routingKey)
.msg(jsonMsg)
.msgId(msgId)
.delay(delay)
.isFailMsg(isFailMsg)
.failMsgDao(failMsgDao)
.build();
CorrelationData correlationData = new CorrelationData(msgId.toString());
correlationData.getFuture().addCallback(futureCallback);
// 1.6.构造消息对象
Message message = MessageBuilder.withBody(StrUtil.bytes(jsonMsg, CharsetUtil.CHARSET_UTF_8))
//持久化
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
//消息id
.setMessageId(msgId.toString())
.build();
try {
// 2.发送消息
//MessagePostProcessor 是一个接口,作用是在消息发送之前对 Message 进行加工或修改。
this.rabbitTemplate.convertAndSend(exchange, routingKey, message, new DelayMessagePostProcessor(delay), correlationData);
} catch (Exception e) {
log.error("send error:" + e);
// 3.构建异常回调,并抛出异常
MqException mqException = new MqException();
mqException.setMsg(ExceptionUtil.getMessage(e));
mqException.setMqId(msgId);
throw mqException;
}
}
/**
* @param mqException mq异常消息
* @param exchange 交换机
* @param routingKey 路由key
* @param msg mq消息
* @param delay 延迟消息
* @param msgId 消息id
*/
//@标记在方法上,表示这是 @Retryable 重试失败后的“备用方案”
@Recover
public void saveFailMag(MqException mqException, String exchange, String routingKey, Object msg, Integer delay, String msgId) {
//发送消息失败,需要将消息持久化到数据库,通过任务调度的方式处理失败的消息
failMsgDao.save(mqException.getMqId(), exchange, routingKey, JsonUtils.toJsonStr(msg), delay, DateUtils.getCurrentTime() + 10, ExceptionUtil.getMessage(mqException));
}
}
CorrelationData 的核心作用总结
消息追踪: 通过 id 字段,将发送的消息与确认结果关联起来。
确认处理: 提供ACK/NACK状态,告诉你消息是否成功送达。
返回支持: 当启用返回时,记录无法投递的消息信息。
异步支持: 与 RabbitTemplate 的异步发送和回调机制(如 ListenableFutureCallback)无缝集成。
rabbitMQ配置类
@Configuration
@ConditionalOnProperty(prefix = "rabbit-mq", name = "enable", havingValue = "true")
@Import({RabbitClient.class, FailMsgDaoImpl.class}) //注册这两个bean并引入。。。其实也不用注册直接@Resource引入就好因为其都注册好了
@EnableConfigurationProperties({RabbitmqProperties.class})
@Slf4j
//实现这个接口后,Spring 会调用 setApplicationContext 方法,把应用上下文(ApplicationContext)传进来。
//通俗解释:就像拿到了一把“万能钥匙”,可以用它从 Spring 容器里拿任何东西(比如 RabbitTemplate)。
public class RabbitMqConfiguration implements ApplicationContextAware {
/**
* 默认的消费者并发数量是 10
*/
public static final int DEFAULT_CONCURRENT = 10;
@Autowired(required = false)
private FailMsgDao failMsgDao;
//专门处理消息消费失败的情况
@Bean
public ErrorMessageRecoverer errorMessageRecoverer(RabbitmqProperties rabbitmqProperties,RabbitClient rabbitClient) {
return new ErrorMessageRecoverer(rabbitClient, rabbitmqProperties);
}
//用来配合定时任务,把之前存下来的失败消息重新发送
@Bean
public RabbitMqResender rabbitMqResender(RabbitTemplate rabbitTemplate, RabbitmqProperties rabbitmqProperties) {
return new RabbitMqResender(rabbitTemplate, rabbitmqProperties);
}
/**
* 配置 RabbitMQ 消息消费者的行为(比如并发数量、异常队列)
*
* @param configurer SimpleRabbitListenerContainerFactoryConfigurer 是一个配置工具类,
* 专门用来帮助配置 SimpleRabbitListenerContainerFactory。它封装了一些默认配置逻辑,简化配置过程
* @param connectionFactory 负责创建和管理与 RabbitMQ 服务器的连接
* @param rabbitmqProperties 配置文件中的设置。
* @return factory
*/
@Bean("defaultContainerFactory")
@Primary
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory,
RabbitmqProperties rabbitmqProperties) {
//设置管理消息消费者的容器
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(DEFAULT_CONCURRENT); //并发消费者数量
factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);// 最大并发消费者数量
configurer.configure(factory, connectionFactory);
/**
* RabbitAdmin 是 Spring AMQP 提供的一个工具类,用于在代码中声明和管理
* RabbitMQ 的组件,比如交换机(Exchange)、队列(Queue)和绑定(Binding)
* 。它简化了与 RabbitMQ 服务器的交互,让你可以用 Java 代码动态创建和管理这些资源,而不需要手动在 RabbitMQ 管理界面操作。
*/
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 声明消费异常交换机
Exchange errorExchange = ExchangeBuilder.topicExchange(rabbitmqProperties.getError().getExchange())
.durable(true).build();
rabbitAdmin.declareExchange(errorExchange);
rabbitAdmin.setAutoStartup(true);
// 声明消费异常队列
Queue errorQueue = new Queue(rabbitmqProperties.getError().getQueue());
rabbitAdmin.declareQueue(errorQueue);
// 声明消费异常消息绑定关系
Binding binding = BindingBuilder.bind(errorQueue)
.to(errorExchange)
.with(rabbitmqProperties.getError().getRoutingKey()).noargs();
rabbitAdmin.declareBinding(binding);
return factory;
}
/**
* 获取 RabbitTemplate:
* 从 Spring 容器中拿到 RabbitTemplate,它是消息发送的核心工具。
* 设置回调:
* 当消息发送失败(比如路由不到队列)时,RabbitMQ 会“退回”消息,触发 returnedMessage 方法。
* 回调中:
* 提取消息体(body)、消息 ID(messageId)、内容(content)。
* 记录日志,写下失败原因(应答码、原因、交换机、路由键等)。
* 如果 failMsgDao 存在,把失败消息存到数据库,包括消息 ID、交换机、路由键、内容等。
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
//定义returnCallback回调方法 从交换机---->队列 3️⃣
rabbitTemplate.setReturnsCallback(
new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
byte[] body = returnedMessage.getMessage().getBody();
//消息id
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
String content = new String(body, Charset.defaultCharset());
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息id{},消息内容{}",
returnedMessage.getReplyCode(),
returnedMessage.getReplyText(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
messageId,
content);
if (failMsgDao != null) {
failMsgDao.save(NumberUtils.parseLong(messageId), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), content, null, DateUtils.getCurrentTime(), "returnCallback");
}
}
}
);
}
/**
* 1️⃣.setApplicationContext 是在 Spring 容器初始化这个 Bean 时被调用,比 @PostConstruct 更早一些
* 过这种方式,开发者可以确保 RabbitTemplate 的回调在容器完全初始化之前就被设置好,避免其他组件使用 RabbitTemplate 时回调还未生效
* 2️⃣RabbitMqConfiguration 类本身依赖 RabbitTemplate,RabbitTemplate 的配置又依赖 RabbitMqConfiguration,造成循环依赖
* 3️⃣用 ApplicationContext 可以动态获取容器中的 Bean,比如根据条件选择不同的 RabbitTemplate(如果有多个)。
* 4️⃣些开发者或团队习惯用 ApplicationContextAware 来手动管理关键组件的配置,尤其是像 RabbitTemplate 这样核心的对象,可能是为了明确控制其生命周期
*
@Configuration
@Slf4j
public class RabbitMqConfiguration {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
byte[] body = returnedMessage.getMessage().getBody();
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
String content = new String(body, Charset.defaultCharset());
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息id{},消息内容{}",
returnedMessage.getReplyCode(),
returnedMessage.getReplyText(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
messageId,
content);
if (failMsgDao != null) {
failMsgDao.save(NumberUtils.parseLong(messageId), returnedMessage.getExchange(),
returnedMessage.getRoutingKey(), content, null, DateUtils.getCurrentTime(), "returnCallback");
}
}
});
}
}**/
}
插件--》服务配置或客户端
DelayMessagePostProcessor(延迟消息处理器):具体用在RabbitClient
/**
* 延迟消息处理器
* 实现了一个消息后处理器(MessagePostProcessor),用于在消息发送到RabbitMQ之前,对消息进行处理
* @author itcast
*/
public class DelayMessagePostProcessor implements MessagePostProcessor {
// 延迟队列默认延迟5s
private static final int DEFAULT_DELAY = 5;
private Integer delay;//使用 Integer 是为了允许传入 null,这样可以在后续逻辑中判断是否需要使用默认值 DEFAULT_DELAY
public DelayMessagePostProcessor(Integer delay) {
this.delay = delay;
}
//Spring AMQP会在消息发送到RabbitMQ之前调用这个接口的 postProcessMessage 方法,让开发者有机会修改消息的内容或属性
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//MessageProperties: 封装了消息的元数据,比如头信息、优先级、延迟时间等。
message.getMessageProperties().setDelay(NumberUtils.null2Default(delay, DEFAULT_DELAY));
return message;
}
}
RabbitMqListenableFutureCallback(处理消息发送的结果(成功或失败,生产者---->交换机的结果)):具体用在RabbitClient
/**
* @author itcast
* ListenableFutureCallback 来自Spring框架(org.springframework.util.concurrent.ListenableFutureCallback)
* 是一个异步回调接口,用于处理异步操作的结果
* 泛型 <CorrelationData.Confirm>: 指定回调的结果类型是 CorrelationData.Confirm,即RabbitMQ的确认结果(ACK或NACK)。
*/
@Builder
public class RabbitMqListenableFutureCallback implements ListenableFutureCallback<CorrelationData.Confirm> {
//记录失败消息service
private FailMsgDao failMsgDao;
private String exchange;
private String routingKey;
private String msg;
private Long msgId;
private Integer delay;
//是否是失败消息
private boolean isFailMsg=false;
//当消息发送失败时调用
@Override
public void onFailure(Throwable ex) {
if(failMsgDao == null) {
return;
}
failMsgDao.save(msgId, exchange, routingKey, msg, delay, DateUtils.getCurrentTime() + 10, ExceptionUtil.getMessage(ex));
}
//这里要注意 当消息发送完成(无论成功还是失败)时调用
@Override
public void onSuccess(CorrelationData.Confirm result) {
if(failMsgDao == null){
return;
}
if(!result.isAck()){
// 执行失败保存失败信息,如果已经存在保存信息,如果不在信息信息
failMsgDao.save(msgId, exchange, routingKey, msg, delay,DateUtils.getCurrentTime() + 10, "MQ回复nack");
}else if(msgId != null){
// 如果发送的是失败消息,当收到ack需要从fail_msg删除该消息
failMsgDao.removeById(msgId);
}
}
}
RabbitMqResender(从错误队列中获取一条消息重新发送)
//如果重试次数耗尽,消费者无法成功处理消息,MessageRecoverer 会被调用来执行兜底逻辑。
public class ErrorMessageRecoverer implements MessageRecoverer {
private RabbitClient rabbitClient;
private RabbitmqProperties rabbitmqProperties;
public ErrorMessageRecoverer(RabbitClient rabbitClient, RabbitmqProperties rabbitmqProperties) {
this.rabbitClient = rabbitClient;
this.rabbitmqProperties = rabbitmqProperties;
}
@Override
public void recover(Message message, Throwable cause) {
// 指定routingKey的消息才能进入队列 只有当消息的路由键在白名单中时,才会进一步处理。
// 如果不在白名单中,消息将被忽略(相当于丢弃)。
if(rabbitmqProperties.getError().getWhiteList().contains(message.getMessageProperties().getReceivedRoutingKey())) {
ErrorRabbitMqMessage errorRabbitMqMessage = new ErrorRabbitMqMessage();
errorRabbitMqMessage.setOriginRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
errorRabbitMqMessage.setOriginExchange(message.getMessageProperties().getReceivedExchange());
errorRabbitMqMessage.setMessage(new String(message.getBody()));
rabbitClient.sendMsg(rabbitmqProperties.getError().getExchange(), rabbitmqProperties.getError().getRoutingKey(), errorRabbitMqMessage);
}
}
}
RabbitMqResender(从错误队列取消息重新发送)
@Slf4j
public class RabbitMqResender {
private RabbitTemplate rabbitTemplate;
private RabbitmqProperties rabbitmqProperties;
private Channel channel;
public RabbitMqResender(RabbitTemplate rabbitTemplate, RabbitmqProperties rabbitmqProperties) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitmqProperties = rabbitmqProperties;
/**
* basicGet 是 RabbitMQ 的“拉模式”(Pull Mode),适合从队列中主动获取一条消息。
* RabbitTemplate 更适合“推模式”(Push Mode)或发送消息,不提供直接的拉取接口。
* 你的需求是从错误队列中获取一条消息并处理,basicGet 是更直接的选择。
* 手动确认:
* 你需要控制消息确认(basicAck),确保只有重发成功后才移除消息。
* RabbitTemplate 的手动确认还是得借助channel
* 综上才用封装程度更低的channel
*/
channel = rabbitTemplate.getConnectionFactory()
.createConnection()
.createChannel(false);
}
/**
* 从队列中获取一条数据并处理,如果没有消息,返回false,有消息返回true
*/
public boolean getOneMessageAndProcess() {
try {
//使用 RabbitMQ 原生 API 从队列中获取一条消息。
//rabbitmqProperties.getError().getQueue(): 从配置中获取错误队列名称(例如 error.queue)。
//false: 表示不自动确认消息(autoAck = false),需要手动调用 basicAck
GetResponse response = channel.basicGet(rabbitmqProperties.getError().getQueue(), false);
if(response == null) {
return false;
}
//将获取的消息转ErrorRabbitMqMessage
ErrorRabbitMqMessage errorRabbitMqMessage = JsonUtils.toBean(new String(response.getBody()), ErrorRabbitMqMessage.class);
Message message = MessageBuilder.withBody(errorRabbitMqMessage.getMessage().getBytes(Charset.defaultCharset())).build();
//重新发送原始交换机和路由键
rabbitTemplate.send(errorRabbitMqMessage.getOriginExchange(), errorRabbitMqMessage.getOriginRoutingKey(), message);
//如果消息成功重新发送(没出现异常),执行到这,手动确认告诉失败队列消息我取出来了,你可以删除对应消息了
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
return true;
}catch (IOException e) {
log.error("消息重发失败,e:",e);
return false;
}
}
//Spring 生命周期注解,在 Bean 销毁时调用
@PreDestroy
public void destory() {
log.info("rabbitmq销毁...");
IoUtils.close(channel); //channel实现了AutoCloseable接口
}
使用xxlJob定时调用(customer那里有(消费者)))
@Component
@Slf4j
public class XxlJobHandler {
@Resource
private RabbitMqResender rabbitMqResender;
/**
* rabbitmq异常消息拉取并重新发回队列
*/
@XxlJob("rabbitmqErrorMsgPullAndResend")
public void rabbitmqErrorMsgPullAndResend(){
log.debug("rabbitmq异常消息重新");
for (int count = 0; count < 100; count++) {
try {
if(!rabbitMqResender.getOneMessageAndProcess()) {
break;
}
}catch (Exception e){
log.error("rabbitmq异常消息拉取失败,e:",e);
}
}
}
}
具体yaml配置
spring:
rabbitmq:
host: 192.168.101.68
username: xzb
password: xzb
port: 5672
virtual-host: /xzb
publisher-confirm-type: correlated #发送消息的异步回调,记录消息是否发送成功
publisher-returns: true #开启publish-return功能,消息到达交换机,但是没有到达对列表
template:
#消息路由失败时的策略, true: 调用ReturnCallback, false:丢弃消息
mandatory: true
listener:
simple:
acknowledge-mode: auto #,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 10 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 90000 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
1. 生产者端配置
rabbitmq:
publisher-confirm-type: correlated # 发送消息的异步回调,记录消息是否发送成功
publisher-returns: true # 开启publish-return功能,消息到达交换机,但是没有到达队列
template:
mandatory: true # 消息路由失败时的策略, true: 调用ReturnCallback, false:丢弃消息
说明:
publish-confirm-type(从生产者-->交换机) 决定生产者如何接收消息发送的确认
配置 RabbitMQ 的发布者确认(Publisher Confirms)模式
none: 默认值,禁用发布者确认。
simple: 启用简单的同步确认,RabbitTemplate 的 waitForConfirms() 方法可用,但效率较低。
correlated: 启用异步确认,通过 CorrelationData 对象关联消息和确认结果(推荐)。
RabbitMqListenableFutureCallback futureCallback = RabbitMqListenableFutureCallback.builder()
.exchange(exchange)
.routingKey(routingKey)
.msg(jsonMsg)
.msgId(msgId)
.delay(delay)
.isFailMsg(isFailMsg)
.failMsgDao(failMsgDao)
.build();
CorrelationData correlationData = new CorrelationData(msgId.toString());
correlationData.getFuture().addCallback(futureCallback);
publish-returns(从交换机-->队列) 当消息到达交换机但无法路由到队列时返回给生产者
false: 默认值,不返回消息。
true: 启用返回机制,需配合 mandatory 和 ReturnsCallback。
rabbitTemplate.setReturnsCallback(
new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
byte[] body = returnedMessage.getMessage().getBody();
//消息id
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
String content = new String(body, Charset.defaultCharset());
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息id{},消息内容{}",
returnedMessage.getReplyCode(),
returnedMessage.getReplyText(),
returnedMessage.getExchange(),
returnedMessage.getRoutingKey(),
messageId,
content);
if (failMsgDao != null) {
failMsgDao.save(NumberUtils.parseLong(messageId), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), content, null, DateUtils.getCurrentTime(), "returnCallback");
}
}
}
);
2. 消费者端配置
rabbitmq:
listener:
simple:
acknowledge-mode: auto # 出现异常时返回nack,消息回滚到mq;没有异常,返回ack
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 10 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
listener.simple.acknowledge-mode 消费者端的消息确认模式
none: 不发送确认,消息被消费后立即移除(不推荐)。
manual: 手动确认,需显式调用 channel.basicAck 或 basicNack。
auto: 默认值,根据处理结果自动确认(成功 ACK,异常 NACK)。
listener.simple.retry.enabled 开启重试
listener.simple.retry.stateless
true: 无状态重试,每次独立。
false: 有状态重试,保留上下文(适用于事务)。
@Override
public void recover(Message message, Throwable cause) {
// 指定routingKey的消息才能进入队列
if(rabbitmqProperties.getError().getWhiteList().contains(message.getMessageProperties().getReceivedRoutingKey())) {
ErrorRabbitMqMessage errorRabbitMqMessage = new ErrorRabbitMqMessage();
errorRabbitMqMessage.setOriginRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
errorRabbitMqMessage.setOriginExchange(message.getMessageProperties().getReceivedExchange());
errorRabbitMqMessage.setMessage(new String(message.getBody()));
rabbitClient.sendMsg(rabbitmqProperties.getError().getExchange(), rabbitmqProperties.getError().getRoutingKey(), errorRabbitMqMessage);
}
}
- 感谢你赐予我前进的力量