1. 简介

利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。

延迟队列的使用场景包括:

  • 延迟发送短信
  • 用户下单,如果用户在15 分钟内未支付,则自动取消
  • 预约工作会议,20分钟后自动通知所有参会人员

因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。

这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html

image-20210718192529342

使用方式可以参考官网地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

2. 安装DelayExchange插件

官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

上述文档是基于linux原生安装RabbitMQ,然后安装插件。

接下来我们会介绍使用docker的方式安装RabbitMQ。

2.1.下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

其中包含各种各样的插件,包括我们要使用的DelayExchange插件:

image-20210713104511055

插件的官方下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

2.2. 安装RabbitMQ

我们这里需要配置RabbitMQ容器,将镜像中的插件目录映射出来,以方便我们后续安装插件

创建RabbitMQ的配置如下

docker run \
 -e RABBITMQ_DEFAULT_USER=user \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

2.2.上传插件

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。

我们使用下面命令查看插件目录映射的数据卷:

docker volume inspect mq-plugins

可以得到下面结果:

image-20210713105135701

接下来,将插件上传到这个目录即可:

image-20210713105339785

2.3.安装插件

最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq,所以执行下面命令:

docker exec -it mq bash

执行时,请将其中的 -it 后面的mq替换为你自己的容器名.

进入容器内部后,执行下面命令开启插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

结果如下:

image-20210713105829435

输入exit退出容器

3. 使用延迟交换机

3.1. DelayExchange原理

DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:

  1. 接收消息

  2. 判断消息是否具备x-delay属性

  3. 如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间

  4. 返回routing not found结果给消息发送者

  5. x-delay时间到期后,重新投递消息到指定队列

3.2. 使用DelayExchange

插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。

3.2.1. 声明DelayExchange交换机

基于注解声明方式:

​ 主要是在@Exchange中添加属性 ****

@RabbitListener(bindings = @QueueBinding(value = @Queue(
        name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayedQueue(String msg) {
    log.info("接收到 delay.queue的延迟消息: {}", msg);
}

也可以基于Bean声明的方式:

@Bean
public DirectExchange delayedExchange() {
    return ExchangeBuilder
            .directExchange("delay.direct")
            .delayed() // 设置delay为true
            .durable(true)
            .build();
}

@Bean
public Queue delayedQueue() {
    return new Queue("delay.queue");
}

@Bean
public Binding delayedBinding() {
    return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
}

3.2.2. 发送消息

发送消息时,一定要携带x-delay属性,指定延迟的时间:

Message message = MessageBuilder
                .withBody("这是一条延迟了10000毫秒的消息".getBytes())
                .setHeader("x-delay", 10000).build();
rabbitTemplate.convertAndSend("delay.direct", "delay", message);
如人饮水,冷暖自知。
最后更新于 2023-08-02