1. 简介
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html
使用方式可以参考官网地址:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
2. 安装DelayExchange插件
官方的安装指南地址为:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
上述文档是基于linux原生安装RabbitMQ,然后安装插件。
接下来我们会介绍使用docker的方式安装RabbitMQ。
2.1.下载插件
RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html
其中包含各种各样的插件,包括我们要使用的DelayExchange插件:
插件的官方下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
2.2. 安装RabbitMQ
我们这里需要配置RabbitMQ容器,将镜像中的插件目录映射出来,以方便我们后续安装插件
创建RabbitMQ的配置如下
docker run \
-e RABBITMQ_DEFAULT_USER=user \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
2.2.上传插件
因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。
我们使用下面命令查看插件目录映射的数据卷:
docker volume inspect mq-plugins
可以得到下面结果:
接下来,将插件上传到这个目录即可:
2.3.安装插件
最后就是安装了,需要进入MQ容器内部来执行安装。我的容器名为mq
,所以执行下面命令:
docker exec -it mq bash
执行时,请将其中的 -it
后面的mq
替换为你自己的容器名.
进入容器内部后,执行下面命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
结果如下:
输入exit
退出容器
3. 使用延迟交换机
3.1. DelayExchange原理
DelayExchange需要将一个交换机声明为delayed类型。当我们发送消息到delayExchange时,流程如下:
-
接收消息
-
判断消息是否具备x-delay属性
-
如果有x-delay属性,说明是延迟消息,持久化到硬盘,读取x-delay值,作为延迟时间
-
返回routing not found结果给消息发送者
-
x-delay时间到期后,重新投递消息到指定队列
3.2. 使用DelayExchange
插件的使用也非常简单:声明一个交换机,交换机的类型可以是任意类型,只需要设定delayed属性为true即可,然后声明队列与其绑定即可。
3.2.1. 声明DelayExchange交换机
基于注解声明方式:
主要是在@Exchange中添加属性 ****
@RabbitListener(bindings = @QueueBinding(value = @Queue(
name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayedQueue(String msg) {
log.info("接收到 delay.queue的延迟消息: {}", msg);
}
也可以基于Bean声明的方式:
@Bean
public DirectExchange delayedExchange() {
return ExchangeBuilder
.directExchange("delay.direct")
.delayed() // 设置delay为true
.durable(true)
.build();
}
@Bean
public Queue delayedQueue() {
return new Queue("delay.queue");
}
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with("delay");
}
3.2.2. 发送消息
发送消息时,一定要携带x-delay属性,指定延迟的时间:
Message message = MessageBuilder
.withBody("这是一条延迟了10000毫秒的消息".getBytes())
.setHeader("x-delay", 10000).build();
rabbitTemplate.convertAndSend("delay.direct", "delay", message);
Comments NOTHING