RabbitMQ快速入门-从基础使用到高级应用

本教程只是留给笔者自己温故复习使用,中间省略了很多笔者自己觉得可以省略的内容,如果有正式的学习需求,可以参考下面的视频和两篇笔记。当然,如果本文章对你有帮助那就更好了。

参考视频:黑马程序员RabbitMQ入门到实战教程

参考笔记:RabbitMQ快速入门 + RabbitMQ高级篇

1. 消息队列(异步调用)的目的和好处:

为了异步调用。什么是异步调用?发送方在发送消息后,不需要等待接收方的立即回应,就可以继续执行其他操作,而接收方在处理完消息后,可能会在未来的某个时间点给出回应。异步调用有很多好处:非阻塞、解耦、故障隔离、削峰填谷等。当然,有一些业务必须使用同步调用,因为这些业务下一步操作依赖于上一步操作的结果。而异步调用本身其实也存在一些问题:不能得到调用结果、不确定下游业务执行是否成功、业务安全依赖于消息代理的可靠性等。

2. 安装并启动 RabbitMQ:

RabbitMQ是基于 Erlang 语言开发的开源消息通信中间件,我们基于docker安装RabbitMQ:

搜索RabbitMQ镜像:sudo docker search rabbitmq

下载RabbitMQ镜像:sudo docker pull rabbitmq

启动 RabbitMQ:

sudo docker run \
-e RABBITMQ_DEFAULT_USER=wuyanzu \
-e RABBITMQ_DEFAULT_PASS=bhoLdSvpd0UAOysh \
-v rabbitmq-plugins:/plugins \
--name rabbitmq \
--hostname rabbitmq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:latest

指令说明:sudo docker run: 基本的Docker命令,用于启动一个新的容器实例

-e RABBITMQ_DEFAULT_USER=wuyanzu: 设置RabbitMQ服务的默认用户名为wuyanzu

-e RABBITMQ_DEFAULT_PASS=kZoeSW$$xS5i^Cum: 设置RabbitMQ服务的默认密码为bhoLdSvpd0UAOysh

-v rabbitmq-plugins:/plugins: 将一个名为rabbitmq-plugins的卷映射到容器的/plugins目录,用于存放RabbitMQ的插件。这里的rabbitmq-plugins是一个卷的名称,而不是宿主机的路径

–name rabbitmq: 指定容器的名称为rabbitmq

–hostname rabbitmq: 设置容器的主机名为rabbitmq

-p 15672:15672: 将宿主机的端口15672映射到容器的端口15672,这是RabbitMQ管理界面的默认端口

-p 5672:5672: 将宿主机的端口5672映射到容器的端口5672,这是RabbitMQ用于AMQP协议通信的默认端口

-d: 在后台运行容器(守护进程)

rabbitmq:latest: 使用最新的RabbitMQ官方镜像来创建容器

注:记得开放端口:

sudo ufw allow 15672
sudo ufw allow 5672
sudo ufw reload

3. RabbitMQ入门

RabbitMQ 有几个核心概念:

  1. Publisher:消息发送者
  2. Consumer:消息的消费者
  3. Queue:消息队列,存储消息
  4. Exchange:交换机,负责路由消息(一般不存储消息)
  5. VirtualHost:虚拟主机,用于数据隔离(项目之间)
RabbitMQ在网页端的管理页面和操作略,页面上的指引很明确了。

4. 在 SpringBoot 项目中集成 RabbitMQ

先了解一下AMQP:Advanced Message Queuing Protocol,是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

再看一下Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

4.1 父工程中引入Maven依赖

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

4.2 在配置文件中编写RabbitMQ的相关配置

spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /blog
username: ZJS
password: 123456

4.3 简单案例

生产者注入RabbitTemplate,然后调用rabbitTemplate.convertAndSend()方法。(参数暂时留空)

消费者标记@RabbitListener(queues = “”)到具体的处理方法上

5 Work Queues 模型

如果有两个或两个以上的消费者监听同一个队列,默认情况下 RabbitMQ 会采用轮询的方法将消息分配给每个队列,即使两个队列的消费能力不一样,默认情况下 RabbitMQ 还是会采用轮询的方法将消息分配给每个队列,也就是平均分配。

@RabbitListener(queues = “work.queue”)
public void listenWorkQueue1(String message) {
  System.out.println(“消费者1 收到了 work.queue的消息:【” + message + “】”);
}

@RabbitListener(queues = “work.queue”)
public void listenWorkQueue2(String message) {
  System.err.println(“消费者2 收到了 work.queue的消息…… :【” + message + “】”);
}

如果要让消费能力强的消费者处理更多的消息(能者多劳模式),只需在配置文件中添加以下信息:
spring:
rabbitmq:
listener:
simple:
prefetch: 1

这个配置信息相当于告诉消费者要一条一条地从队列中取出消息,只有处理完一条消息才能取出下一条,这样一来,就可以充分利用每一台机器的性能,让消费能力强的消费者处理更多的消息,同时还可以避免消息在消费能力较弱的消费者上发生堆积的情况,达到了“能者多劳”的效果。

6 交换机

交换机的类型有以下三种:

  1. Fanout:广播
  2. Direct:定向
  3. Topic:话题

注意:交换机只能路由和转发消息,不能存储消息。

Fanout和Direct交换机都是见名思意,前者会将接收到的消息广播到每一个跟其绑定的 queue,后者会将接收到的消息根据“规则”路由到指定的队列,这里的规则指的是每一个 Queue 与 Exchange 绑定时设置的 bindingKey,发布者发送消息时,会指定消息的 RoutingKey,从而与 bindingKey 匹配,当然同一个队列可以绑定多个 bindingKey,多个队列也可以绑定同一个 bindingKey。

最推荐使用的是Topic 交换机:Topic Exchange 与 Direct Exchange类似,区别在于 Topic Exchange 的 routingKey 可以是多个单词的列表(多个 routingKey 之间以 “.” 分割),还可以使用通配符 # (代指 0 个或多个单词) 和 * (代指 1 个单词),比Direct更灵活。

7 在 SpringBoot 项目中声明队列和交换机的方式

7.1 编程式声明(不推荐)

编写配置类:

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfiguration {

@Bean
public FanoutExchange fanoutExchange3() {
return ExchangeBuilder.fanoutExchange(“blog.fanout3”).build();
}

@Bean
public FanoutExchange fanoutExchange4() {
return new FanoutExchange(“blog.fanout4”);
}

@Bean
public Queue fanoutQueue3() {
return new Queue(“fanout.queue3”);
}

@Bean
public Queue fanoutQueue4() {
return QueueBuilder.durable(“fanout.queue4”).build();
}

@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange3) {
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);
}

@Bean
public Binding fanoutBinding4() {
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange4());
}

}

7.2 注解式声明(推荐)

在消费者端标明注解:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = “direct.queue1”),
    exchange = @Exchange(name = “blog.direct”, type = ExchangeTypes.DIRECT),
    key = {“red”, “blue”}
))
public void listenDirectQueue1(String message) {
    System.out.println(“消费者1 收到了 direct.queue1的消息:【” + message + “】”);
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = “direct.queue2”),
    exchange = @Exchange(name = “blog.direct”, type = ExchangeTypes.DIRECT),
    key = {“red”, “yellow”}
))
public void listenDirectQueue2(String message) {
    System.out.println(“消费者2 收到了 direct.queue2的消息:【” + message + “】”);
}

7.3 但实际上可能的最佳实践:

// 先通过管理界面或配置类预创建
@Configuration
public class RabbitConfig {

  @Bean
  public Queue lazyQueue() {
    return QueueBuilder.durable(“lazy.queue”)
      .lazy() // 使用lazy队列
      .build();
  }

  @Bean
  public DirectExchange delayExchange() {
    return ExchangeBuilder.directExchange(“delay.direct”)
      .delayed()
     .durable(true)
     .build();
  }
}

// 然后消费时只引用
@RabbitListener(queues = “lazy.queue”)
public void listenLazyQueue(String message) {
  // …
}

8 消息转换器

如果不配置消息转换器,就和Redis一样,在序列化和反序列化(主要是序列化)的过程中会出现乱码,可读性降低,占用空间增大。这本质是因为默认使用了JDK提供的序列化方法。

所以我们要自定义消息转换器,使用JSON序列化。

先在父工程引入jackson依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
</dependency>

注:如果是 Web 项目,无需引入该依赖,因为 spring-boot-starter-web 依赖中已包含该依赖。

然后在 consumer 服务和 publisher 服务中配置 MessageConverter:

@Bean
public MessageConverter jacksonMessageConvertor(){
    return new Jackson2JsonMessageConverter();
}

9 生产者与消费者的可靠性

之所以强调可靠性,是因为考虑到消息丢失的情况:

  1. 生产者向消息代理传递消息的过程中,消息丢失了
  2. 消息代理( RabbitMQ )把消息弄丢了
  3. 消费者把消息弄丢了

9.1 生产者的可靠性

(1)生产者重连,由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /blog
    username: ZJS
    password: 123456
    connection-timeout: 1s  # 连接超时时间
    template:
      retry:
        enabled: true  # 开启连接超时重试机制
        initial-interval: 1000ms  # 连接失败后的初始等待时间
        multiplier: 1  # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
        max-attempts: 3  # 最大重试次数

注意事项:

  1. 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能;
  2. 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也可以考虑使用异步线程来执行发送消息的代码。

(2)生产者确认

RabbitMQ 提供了 Publisher Confirm 和 Publisher Return 两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:

  1. 消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因(Publisher Return),然后返回 ACK(Publisher Confirm),告知生产者消息投递成功;
  2. 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功;
  3. 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功;
  4. 其它情况都会返回 NACK,告知生产者消息投递失败。

相关配置信息:

spring:
  rabbitmq:
    publisher-returns: true
    publisher-confirm-type: correlated

publisher-confirm-type 有三种模式:

  1. none:关闭 confirm 机制
  2. simple:以同步阻塞等待的方式返回 MQ 的回执消息
  3. correlated:以异步回调方式的方式返回 MQ 的回执消息(显然correlated更好)

注意:每个 RabbitTemplate 只能配置一个 ReturnCallback,用于实现Publisher Return:

@Configuration
public class RabbitMQConfig implements ApplicationContextAware {

@Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

    // 配置回调
    rabbitTemplate.setReturnsCallback((returnedMessage) -> {
      System.out.println(“收到消息的return callback, ” +
          “exchange = ” + returnedMessage.getExchange() + “, ” +
          “routingKey = ” + returnedMessage.getRoutingKey() + “, ” +
          “replyCode = ” + returnedMessage.getReplyCode() + “, ” +
          “replyText = ” + returnedMessage.getReplyText() + “, ” +
          “message = ” + returnedMessage.getMessage());
});
}

}

针对每一个生产者发送消息的方法,实现相应的Publisher Confirm:

@Test
void testConfirmCallback() throws InterruptedException {
  CorrelationData correlationData = new CorrelationData();
  correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> {
    if (confirm.isAck()) {
      // 消息发送成功
      System.out.println(“消息发送成功,收到ack”);
    } else {
      // 消息发送失败
      System.err.println(“消息发送失败,收到nack,原因是” + confirm.getReason());
    }

    if (throwable != null) {
      // 消息回调失败
      System.err.println(“消息回调失败”);
    }
  });

    rabbitTemplate.convertAndSend(“blog.direct”, “red”, “Hello, confirm callback”, correlationData);

    // 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果
    Thread.sleep(2000);
}

  1. 生产者确认需要额外的网络开销和系统资源开销,尽量不要使用;
  2. 如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题;
  3. 对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息。

9.2 消费者的可靠性

(1)消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:

  1. ack:成功处理消息,RabbitMQ 从队列中删除该消息
  2. nack:消息处理失败,RabbitMQ 需要再次投递消息
  3. reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息

SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:

  1. none:不处理,即消息投递给消费者后立刻 ack,消息会会立刻从 MQ 中删除,非常不安全,不建议使用;
  2. manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活;
  3. auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:
    如果是业务异常,会自动返回 nack;
    如果是消息处理或校验异常,自动返回 reject。

开启消息确认机制,需要在消费端的 application.yml 文件中编写相关的配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto

(2)失败重试机制

当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入无限循环,给 RabbitMQ 带来不必要的压力。

我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队。

在 application.yml 配置文件中开启失败重试机制:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto
        retry:
          enabled: true    # 开启消息消费失败重试机制
          initial-interval: 1000ms    # 消息消费失败后的初始等待时间
          multiplier: 1    # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier
          max-attempts: 3    # 最大重试次数
          stateless: true    # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false

但这有个问题是一旦本地重试次数用尽,消息队列里的消息也会丢失,正常情况下,消息丢失都不是我们想看到的,该怎么解决这个问题呢?

(3)失败消息的处理策略

开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:

  1. RejectAndDontRequeueRecoverer:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式
  2. ImmediateRequeueMessageRecoverer:重试次数耗尽后,返回 nack,消息重新入队
  3. RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

推荐使用RepublishMessageRecoverer,以下为案例:

第一步:定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定:

@Configuration
@ConditionalOnProperty(prefix = “spring.rabbitmq.listener.simple.retry”, name = “enabled”, havingValue = “true”)
public class ErrorConfiguration {

  @Bean
  public DirectExchange errorExchange() {
    return new DirectExchange(“error.direct”, true, false);
  }

  @Bean
  public Queue errorQueue() {
    return new Queue(“error.queue”, true, false, false);
  }

  @Bean
  public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
    return BindingBuilder.bind(errorQueue).to(errorExchange).with(“error”);
  }

}

第二步:将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效):

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
  return new RepublishMessageRecoverer(rabbitTemplate, “error.direct”, “error”);
}

10 消息代理(RabbitMQ)的可靠性

在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:

  1. 一旦 RabbitMQ 宕机,内存中的消息会丢失
  2. 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)

怎么理解 MQ 阻塞呢?当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息。

解决上述丢失和阻塞问题的方法有以下两种:

10.1 数据持久化

RabbitMQ 实现数据持久化包括 3 个方面:

  1. 交换机持久化
  2. 队列持久化
  3. 消息持久化

注意事项:

利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的;
在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)。

该方法主要看一下消息持久化的常用方式:

@Test
void testPagedOut() {
  Message message = MessageBuilder.withBody(“Hello, paged out”.getBytes(StandardCharsets.UTF_8))
      .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
      .build();

  for (int i = 0; i < 1; i++) {
    rabbitTemplate.convertAndSend(“simple.queue”, message);
  }
}

10.2 LazyQueue(3.12 版本后所有队列都是 Lazy Queue 模式)

LazyQueue的特征如下:

  1. 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认 2048条 )
  2. 消费者要处理消息时才会从磁盘中读取并加载到内存
  3. 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改

在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可。
在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可(这里只编写注解式创建):

@RabbitListener(queuesToDeclare = @Queue(
    name = “lazy.queue”,
    durable = “true”,
    arguments = @Argument(
      name = “x-queue-mode”,
      value = “lazy”
    )
))
public void listenLazeQueue(String message) {
System.out.println(“消费者收到了 laze.queue2的消息: ” + message);
}

注意,上述声明队列的方式会自动绑定到默认交换机(direct exchange),且当声明已存在的队列/交换机时,Spring AMQP 会检查参数是否一致,一致则使用原有队列/交换机,不一致直接报错。

11 业务幂等性

幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),绝对值函数具有幂等性。在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的。

我们要做的是确保业务的幂等性。

11.1 方案一:为每条消息设置一个唯一的 id

给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:

  1. 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
  3. 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理

可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId:

@Bean
public MessageConverter jacksonMessageConvertor() {
  Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
  jackson2JsonMessageConverter.setCreateMessageIds(true);
  return jackson2JsonMessageConverter;
}

但这种方式对业务有一定的侵入性,且数据库压力会很大,不推荐。

11.2 方案二:结合业务判断

结合业务逻辑,基于业务本身做判断。以支付业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付,只有未支付订单才需要修改,其它状态的订单不做处理。

总结:如何保证支付服务与交易服务之间的订单状态一致性?

  1. 首先,支付服务会正在用户支付成功以后利用 MQ 发送消息通知交易服务,完成订单状态同步;
  2. 其次,为了保证 MQ 消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性,同时也开启了MQ的持久化,避免因服务宕机导致消息丢失;
  3. 最后,我们还在交易服务更新订单状态时做了业务幕等判断,避免因消息重复消费导致订单状态异常。

当然,如果交易服务消息处理失败,支付服务和交易服务出现了数据不一致的情况,还可以在交易服务设置定时任务,定期查询订单支付状态,这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

12 延迟消息

延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息。

12.1 死信交换机

当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):

  1. (消费者使用 basic.reject) 或 (basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false);
  2. 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费;
  3. 要投递的队列消息堆积满了,最早的消息可能成为死信。

如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。

可以利用死信交换机的特点,可以实现发送延迟消息的功能,但实际应用起来很麻烦,并且这也并非DLX被设计出来的本意,所以不推荐。

12.2 延迟消息插件(推荐使用)

下载和安装教程

消费端代码:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = “delay.queue”),
    exchange = @Exchange(name = “delay.direct”, delayed = “true”, type = ExchangeTypes.DIRECT),
    key = “delay”
))
public void listenDelayQueue(String message) {
  System.out.println(“消费者收到了 delay.queue的消息: ” + message + “,时间:” + simpleDateFormat.format(System.currentTimeMillis()));
}

注意:这里的 delayed = “true” 指的是延迟交换机,原理是消息先保存在交换机中,等待延迟时间到了才路由到队列。

生产端代码:

@Test
void testSendDelayMessage() {
  rabbitTemplate.convertAndSend(“delay.direct”, “delay”, “Hello, DelayQueue!”, new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
      message.getMessageProperties().setDelay(10000); // 毫秒
      return message;
    }
});

}

发送延迟消息的本质是在消息头属性中添加 x-delay 属性。

12.3 延迟消息的原理和缺点

RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒。

RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)。

定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大。

补充:延迟交换机 + LazyQueue的注解式声明写法:

// 延迟交换机 + 惰性队列的组合
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(
      name = “delay.lazy.queue”,
      durable = “true”,
      arguments = @Argument(name = “x-queue-mode”, value = “lazy”)   // LazyQueue
),
    exchange = @Exchange(
      name = “delay.direct”,
      delayed = “true”,    // 延迟交换机
      type = ExchangeTypes.DIRECT
    ),
    key = “delay”
))

特别声明:当且仅当技术类文章可转载,转载时请标明出处和作者哟~
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇