本教程只是留给笔者自己温故复习使用,中间省略了很多笔者自己觉得可以省略的内容,如果有正式的学习需求,可以参考下面的视频和两篇笔记。当然,如果本文章对你有帮助那就更好了。
参考视频:黑马程序员RabbitMQ入门到实战教程
参考笔记:RabbitMQ快速入门 + RabbitMQ高级篇
1. 消息队列(异步调用)的目的和好处:
为了异步调用。什么是异步调用?发送方在发送消息后,不需要等待接收方的立即回应,就可以继续执行其他操作,而接收方在处理完消息后,可能会在未来的某个时间点给出回应。异步调用有很多好处:非阻塞、解耦、故障隔离、削峰填谷等。当然,有一些业务必须使用同步调用,因为这些业务下一步操作依赖于上一步操作的结果。而异步调用本身其实也存在一些问题:不能得到调用结果、不确定下游业务执行是否成功、业务安全依赖于消息代理的可靠性等。
2. 安装并启动 RabbitMQ:
RabbitMQ是基于 Erlang 语言开发的开源消息通信中间件,我们基于docker安装RabbitMQ:
搜索RabbitMQ镜像:sudo docker search rabbitmq
下载RabbitMQ镜像:sudo docker pull rabbitmq
启动 RabbitMQ:
sudo docker run \
-e RABBITMQ_DEFAULT_USER=wuyanzu \
-e RABBITMQ_DEFAULT_PASS=bhoLdSvpd0UAOysh \
-v rabbitmq-plugins:/plugins \
--name rabbitmq \
--hostname rabbitmq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:latest
指令说明:sudo docker run: 基本的Docker命令,用于启动一个新的容器实例
-e RABBITMQ_DEFAULT_USER=wuyanzu: 设置RabbitMQ服务的默认用户名为wuyanzu
-e RABBITMQ_DEFAULT_PASS=kZoeSW$$xS5i^Cum: 设置RabbitMQ服务的默认密码为bhoLdSvpd0UAOysh
-v rabbitmq-plugins:/plugins: 将一个名为rabbitmq-plugins的卷映射到容器的/plugins目录,用于存放RabbitMQ的插件。这里的rabbitmq-plugins是一个卷的名称,而不是宿主机的路径
–name rabbitmq: 指定容器的名称为rabbitmq
–hostname rabbitmq: 设置容器的主机名为rabbitmq
-p 15672:15672: 将宿主机的端口15672映射到容器的端口15672,这是RabbitMQ管理界面的默认端口
-p 5672:5672: 将宿主机的端口5672映射到容器的端口5672,这是RabbitMQ用于AMQP协议通信的默认端口
-d: 在后台运行容器(守护进程)
rabbitmq:latest: 使用最新的RabbitMQ官方镜像来创建容器
注:记得开放端口:
sudo ufw allow 15672
sudo ufw allow 5672
sudo ufw reload
3. RabbitMQ入门
RabbitMQ 有几个核心概念:
- Publisher:消息发送者
- Consumer:消息的消费者
- Queue:消息队列,存储消息
- Exchange:交换机,负责路由消息(一般不存储消息)
- VirtualHost:虚拟主机,用于数据隔离(项目之间)
4. 在 SpringBoot 项目中集成 RabbitMQ
先了解一下AMQP:Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
再看一下Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
4.1 父工程中引入Maven依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2 在配置文件中编写RabbitMQ的相关配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /blog
username: ZJS
password: 123456
4.3 简单案例
生产者注入RabbitTemplate,然后调用rabbitTemplate.convertAndSend()方法。(参数暂时留空)
消费者标记@RabbitListener(queues = “”)到具体的处理方法上
5 Work Queues 模型
如果有两个或两个以上的消费者监听同一个队列,默认情况下 RabbitMQ 会采用轮询的方法将消息分配给每个队列,即使两个队列的消费能力不一样,默认情况下 RabbitMQ 还是会采用轮询的方法将消息分配给每个队列,也就是平均分配。
@RabbitListener(queues = “work.queue”)
public void listenWorkQueue1(String message) {
System.out.println(“消费者1 收到了 work.queue的消息:【” + message + “】”);
}
@RabbitListener(queues = “work.queue”)
public void listenWorkQueue2(String message) {
System.err.println(“消费者2 收到了 work.queue的消息…… :【” + message + “】”);
}
如果要让消费能力强的消费者处理更多的消息(能者多劳模式),只需在配置文件中添加以下信息:spring:
rabbitmq:
listener:
simple:
prefetch: 1
这个配置信息相当于告诉消费者要一条一条地从队列中取出消息,只有处理完一条消息才能取出下一条,这样一来,就可以充分利用每一台机器的性能,让消费能力强的消费者处理更多的消息,同时还可以避免消息在消费能力较弱的消费者上发生堆积的情况,达到了“能者多劳”的效果。
6 交换机
交换机的类型有以下三种:
- Fanout:广播
- Direct:定向
- Topic:话题
注意:交换机只能路由和转发消息,不能存储消息。
Fanout和Direct交换机都是见名思意,前者会将接收到的消息广播到每一个跟其绑定的 queue,后者会将接收到的消息根据“规则”路由到指定的队列,这里的规则指的是每一个 Queue 与 Exchange 绑定时设置的 bindingKey,发布者发送消息时,会指定消息的 RoutingKey,从而与 bindingKey 匹配,当然同一个队列可以绑定多个 bindingKey,多个队列也可以绑定同一个 bindingKey。
最推荐使用的是Topic 交换机:Topic Exchange 与 Direct Exchange类似,区别在于 Topic Exchange 的 routingKey 可以是多个单词的列表(多个 routingKey 之间以 “.” 分割),还可以使用通配符 # (代指 0 个或多个单词) 和 * (代指 1 个单词),比Direct更灵活。
7 在 SpringBoot 项目中声明队列和交换机的方式
7.1 编程式声明(不推荐)
编写配置类:
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange3() {
return ExchangeBuilder.fanoutExchange(“blog.fanout3”).build();
}
@Bean
public FanoutExchange fanoutExchange4() {
return new FanoutExchange(“blog.fanout4”);
}
@Bean
public Queue fanoutQueue3() {
return new Queue(“fanout.queue3”);
}
@Bean
public Queue fanoutQueue4() {
return QueueBuilder.durable(“fanout.queue4”).build();
}
@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange3) {
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);
}
@Bean
public Binding fanoutBinding4() {
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange4());
}
}
7.2 注解式声明(推荐)
在消费者端标明注解:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = “direct.queue1”),
exchange = @Exchange(name = “blog.direct”, type = ExchangeTypes.DIRECT),
key = {“red”, “blue”}
))
public void listenDirectQueue1(String message) {
System.out.println(“消费者1 收到了 direct.queue1的消息:【” + message + “】”);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = “direct.queue2”),
exchange = @Exchange(name = “blog.direct”, type = ExchangeTypes.DIRECT),
key = {“red”, “yellow”}
))
public void listenDirectQueue2(String message) {
System.out.println(“消费者2 收到了 direct.queue2的消息:【” + message + “】”);
}
7.3 但实际上可能的最佳实践:
// 先通过管理界面或配置类预创建
@Configuration
public class RabbitConfig {
@Bean
public Queue lazyQueue() {
return QueueBuilder.durable(“lazy.queue”)
.lazy() // 使用lazy队列
.build();
}
@Bean
public DirectExchange delayExchange() {
return ExchangeBuilder.directExchange(“delay.direct”)
.delayed()
.durable(true)
.build();
}
}
// 然后消费时只引用
@RabbitListener(queues = “lazy.queue”)
public void listenLazyQueue(String message) {
// …
}
8 消息转换器
如果不配置消息转换器,就和Redis一样,在序列化和反序列化(主要是序列化)的过程中会出现乱码,可读性降低,占用空间增大。这本质是因为默认使用了JDK提供的序列化方法。
所以我们要自定义消息转换器,使用JSON序列化。
先在父工程引入jackson依赖:
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
注:如果是 Web 项目,无需引入该依赖,因为 spring-boot-starter-web 依赖中已包含该依赖。
然后在 consumer 服务和 publisher 服务中配置 MessageConverter:
@Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}
9 生产者与消费者的可靠性
之所以强调可靠性,是因为考虑到消息丢失的情况:
- 生产者向消息代理传递消息的过程中,消息丢失了
- 消息代理( RabbitMQ )把消息弄丢了
- 消费者把消息弄丢了
9.1 生产者的可靠性
(1)生产者重连,由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /blog
username: ZJS
password: 123456
connection-timeout: 1s # 连接超时时间
template:
retry:
enabled: true # 开启连接超时重试机制
initial-interval: 1000ms # 连接失败后的初始等待时间
multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
max-attempts: 3 # 最大重试次数
注意事项:
- 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能;
- 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也可以考虑使用异步线程来执行发送消息的代码。
(2)生产者确认
RabbitMQ 提供了 Publisher Confirm 和 Publisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:
- 消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因(Publisher Return),然后返回 ACK(Publisher Confirm),告知生产者消息投递成功;
- 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功;
- 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功;
- 其它情况都会返回 NACK,告知生产者消息投递失败。
相关配置信息:
spring:
rabbitmq:
publisher-returns: true
publisher-confirm-type: correlated
publisher-confirm-type 有三种模式:
- none:关闭 confirm 机制
- simple:以同步阻塞等待的方式返回 MQ 的回执消息
- correlated:以异步回调方式的方式返回 MQ 的回执消息(显然correlated更好)
注意:每个 RabbitTemplate 只能配置一个 ReturnCallback,用于实现Publisher Return:
@Configuration
public class RabbitMQConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置回调
rabbitTemplate.setReturnsCallback((returnedMessage) -> {
System.out.println(“收到消息的return callback, ” +
“exchange = ” + returnedMessage.getExchange() + “, ” +
“routingKey = ” + returnedMessage.getRoutingKey() + “, ” +
“replyCode = ” + returnedMessage.getReplyCode() + “, ” +
“replyText = ” + returnedMessage.getReplyText() + “, ” +
“message = ” + returnedMessage.getMessage());
});
}
}
针对每一个生产者发送消息的方法,实现相应的Publisher Confirm:
@Test
void testConfirmCallback() throws InterruptedException {
CorrelationData correlationData = new CorrelationData();
correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {
if (confirm.isAck()) {
// 消息发送成功
System.out.println(“消息发送成功,收到ack”);
} else {
// 消息发送失败
System.err.println(“消息发送失败,收到nack,原因是” + confirm.getReason());
}
if (throwable != null) {
// 消息回调失败
System.err.println(“消息回调失败”);
}
});
rabbitTemplate.convertAndSend(“blog.direct”, “red”, “Hello, confirm callback”, correlationData);
// 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果
Thread.sleep(2000);
}
- 生产者确认需要额外的网络开销和系统资源开销,尽量不要使用;
- 如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题;
- 对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息。
9.2 消费者的可靠性
(1)消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:
- ack:成功处理消息,RabbitMQ 从队列中删除该消息
- nack:消息处理失败,RabbitMQ 需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息
SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:
- none:不处理,即消息投递给消费者后立刻 ack,消息会会立刻从 MQ 中删除,非常不安全,不建议使用;
- manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活;
- auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:
如果是业务异常,会自动返回 nack;
如果是消息处理或校验异常,自动返回 reject。
开启消息确认机制,需要在消费端的 application.yml 文件中编写相关的配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
(2)失败重试机制
当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入无限循环,给 RabbitMQ 带来不必要的压力。
我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队。
在 application.yml 配置文件中开启失败重试机制:
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true # 开启消息消费失败重试机制
initial-interval: 1000ms # 消息消费失败后的初始等待时间
multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
max-attempts: 3 # 最大重试次数
stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false
但这有个问题是一旦本地重试次数用尽,消息队列里的消息也会丢失,正常情况下,消息丢失都不是我们想看到的,该怎么解决这个问题呢?
(3)失败消息的处理策略
开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:
- RejectAndDontRequeueRecoverer:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回 nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
推荐使用RepublishMessageRecoverer,以下为案例:
第一步:定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定:
@Configuration
@ConditionalOnProperty(prefix = “spring.rabbitmq.listener.simple.retry”, name = “enabled”, havingValue = “true”)
public class ErrorConfiguration {
@Bean
public DirectExchange errorExchange() {
return new DirectExchange(“error.direct”, true, false);
}
@Bean
public Queue errorQueue() {
return new Queue(“error.queue”, true, false, false);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with(“error”);
}
}
第二步:将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效):
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, “error.direct”, “error”);
}
10 消息代理(RabbitMQ)的可靠性
在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:
- 一旦 RabbitMQ 宕机,内存中的消息会丢失
- 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)
怎么理解 MQ 阻塞呢?当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息。
解决上述丢失和阻塞问题的方法有以下两种:
10.1 数据持久化
RabbitMQ 实现数据持久化包括 3 个方面:
- 交换机持久化
- 队列持久化
- 消息持久化
注意事项:
利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的;
在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)。
该方法主要看一下消息持久化的常用方式:
@Test
void testPagedOut() {
Message message = MessageBuilder.withBody(“Hello, paged out”.getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
for (int i = 0; i < 1; i++) {
rabbitTemplate.convertAndSend(“simple.queue”, message);
}
}
10.2 LazyQueue(3.12 版本后所有队列都是 Lazy Queue 模式)
LazyQueue的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认 2048条 )
- 消费者要处理消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改
在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可。
在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可(这里只编写注解式创建):
@RabbitListener(queuesToDeclare = @Queue(
name = “lazy.queue”,
durable = “true”,
arguments = @Argument(
name = “x-queue-mode”,
value = “lazy”
)
))
public void listenLazeQueue(String message) {
System.out.println(“消费者收到了 laze.queue2的消息: ” + message);
}
注意,上述声明队列的方式会自动绑定到默认交换机(direct exchange),且当声明已存在的队列/交换机时,Spring AMQP 会检查参数是否一致,一致则使用原有队列/交换机,不一致直接报错。
11 业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),绝对值函数具有幂等性。在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的。
我们要做的是确保业务的幂等性。
11.1 方案一:为每条消息设置一个唯一的 id
给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:
- 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
- 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理
可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId:
@Bean
public MessageConverter jacksonMessageConvertor() {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
但这种方式对业务有一定的侵入性,且数据库压力会很大,不推荐。
11.2 方案二:结合业务判断
结合业务逻辑,基于业务本身做判断。以支付业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付,只有未支付订单才需要修改,其它状态的订单不做处理。
总结:如何保证支付服务与交易服务之间的订单状态一致性?
- 首先,支付服务会正在用户支付成功以后利用 MQ 发送消息通知交易服务,完成订单状态同步;
- 其次,为了保证 MQ 消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性,同时也开启了MQ的持久化,避免因服务宕机导致消息丢失;
- 最后,我们还在交易服务更新订单状态时做了业务幕等判断,避免因消息重复消费导致订单状态异常。
当然,如果交易服务消息处理失败,支付服务和交易服务出现了数据不一致的情况,还可以在交易服务设置定时任务,定期查询订单支付状态,这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
12 延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息。
12.1 死信交换机
当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):
- (消费者使用 basic.reject) 或 (basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false);
- 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费;
- 要投递的队列消息堆积满了,最早的消息可能成为死信。
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
可以利用死信交换机的特点,可以实现发送延迟消息的功能,但实际应用起来很麻烦,并且这也并非DLX被设计出来的本意,所以不推荐。
12.2 延迟消息插件(推荐使用)
消费端代码:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = “delay.queue”),
exchange = @Exchange(name = “delay.direct”, delayed = “true”, type = ExchangeTypes.DIRECT),
key = “delay”
))
public void listenDelayQueue(String message) {
System.out.println(“消费者收到了 delay.queue的消息: ” + message + “,时间:” + simpleDateFormat.format(System.currentTimeMillis()));
}
注意:这里的 delayed = “true” 指的是延迟交换机,原理是消息先保存在交换机中,等待延迟时间到了才路由到队列。
生产端代码:
@Test
void testSendDelayMessage() {
rabbitTemplate.convertAndSend(“delay.direct”, “delay”, “Hello, DelayQueue!”, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000); // 毫秒
return message;
}
});
}
发送延迟消息的本质是在消息头属性中添加 x-delay 属性。
12.3 延迟消息的原理和缺点
RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒。
RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)。
定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大。
补充:延迟交换机 + LazyQueue的注解式声明写法:
// 延迟交换机 + 惰性队列的组合
@RabbitListener(bindings = @QueueBinding(
value = @Queue(
name = “delay.lazy.queue”,
durable = “true”,
arguments = @Argument(name = “x-queue-mode”, value = “lazy”) // LazyQueue
),
exchange = @Exchange(
name = “delay.direct”,
delayed = “true”, // 延迟交换机
type = ExchangeTypes.DIRECT
),
key = “delay”
))