1. 简介
在上一篇中,我们在介绍RabbitMQ的时候就提到过,RabbitMQ
支持 AMQP,XMPP,SMTP,STOMP协议。
协议 是一种规范,是用于规定如何将消息从一方传给另一方的。正如TCP协议一样,消息队列协议本身也并不关心消息如何构建,也不关心消息的请求、获取、存储方式等。具体协议的实现方式是多样的,可以由各种语言,各种方式实现。
AMQP 正是RabbitMQ支持的一种消息队列协议。而Spring为我们提供了一套基于AMQP协议的API规范,就是SpringAMQP
。SpringAMQP利用了SpringBoot对其实现了自动装配,并且提供了一个用来接收和发送消息的模板。它由两部分组成:spring-amqp 是基本抽象,spring-rabbit 底层由RabbitMQ实现
SpringAMQP主要提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息
- 封装了RabbitTemplate工具,用于发送消息
2. 案例1:通过Bean声明
2.1. 最小配置
为了项目能够正常使用SpringAMQP,你应该至少进行如下的最小配置。
- 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- application.yml (生产者和消费者端都要配置)
spring:
rabbitmq:
host: 192.168.150.101 # 主机IP
port: 5672 # 端口
virtual-host: / # 虚拟主机,默认为/
username: user # 用户名
password: 123456 # 密码
2.2. 代码实现
在接下来的案例中,我们编写一个简单的 广播模型。
消费者端
在使用java操作RabbitMQ之前,我们需要通过java程序 创建交换机
、创建队列
、 创建绑定关系
。
创建交换机和队列一般有两种方式
一种是使用下面代码中的方法,通过Bean声明
的方式构建对应的bean,之后amqp会自动创建对应的交换机、队列和绑定关系。
另一种方式是在消费者创建监听的时候,通过注解声明
的方式创建,这种方式我们会在下一个案例中讲解。
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanout.exchange");
}
/**
* 声明队列 queue1
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定交换机和queue1
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 声明队列 queue2
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定交换机和queue2
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
上面只是声明RabbitMQ中的内容,接下来我们需要建立一个监听方法,指定监听的队列,并与RabbitMQ建立连接。当队列有消息发过来的时候,监听方法进行接收消息并进行下一步的处理。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
}
生产者端
消费者端的操作就很简单了,只需要在需要发消息的类中注入RabbitTemplate,然后调用方法即可发送消息。由于我们当前的例子是广播,所以不需要传bindingKey参数。
对于传递的消息参数,可以是纯字符串,也可以是一个对象,如果是对象,那对象必须实现Serializable接口
。对象在传输的时候,会被RabbitMQ转换成字节数组
传输,在接收端重新反序列化成对象。
public void testFanoutExchange() {
// 队列名称
String exchangeName = "fanout.exchange";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
3. 案例2:通过注解声明
3.1. 最小配置
为了项目能够正常使用SpringAMQP,你应该至少进行如下的最小配置。
- 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- application.yml (生产者和消费者端都要配置)
spring:
rabbitmq:
host: 192.168.150.101 # 主机IP
port: 5672 # 端口
virtual-host: / # 虚拟主机,默认为/
username: user # 用户名
password: 123456 # 密码
3.2. 代码实现
接下来,我们将使用注解声明
交换机和队列的方式,实现一个 路由模型 的案例。
在这个模型中,direct.queue1 和交换机通过 blue 和 red 这两个bindingKey进行绑定.
direct.queue2 和交换机通过 yellow 和 red 这两个bindingKey进行绑定.
如果 生产者 给 direct.exchange 交换机发送一条bindingKey为 red 的消息,则交换机会将消息发送给两个队列,因为两个队列的绑定关系中都包含 red
如果 生产者 给 direct.exchange 交换机 送一条bindingKey为 yellow 的消息,则交换机会将消息发送给 direct.queue2,因为绑定的时候只有direct.queue2绑定了 yellow
消费者端
在该案例中,我们在通过注解,在消费者端的监听方法上,创建交换机、队列和绑定关系。
在第一个监听方法的@RabbitListener
注解中,通过@Queue
注解声明(创建)一个名字为direct.queue1的队列。
通过@Exchange
注解声明一个交换机,交换机的名字为direct.exchange,交换机的类型为direct。将交换机和上面创建的队列进行绑定,参数key
为绑定的bindingKey。
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg) {
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg) {
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
}
生产者端
消费者端的操作与上面的广播相似,只需要在需要发消息的类中注入RabbitTemplate,然后调用方法即可发送消息。
在路由模型中,需要传一个bindingKey参数,交换机根据bindingKey进行匹配,决定发给哪个队列。
public void testFanoutExchange() {
// 队列名称
String exchangeName = "fanout.exchange";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
4. 能者多劳
在上一篇笔记中,我们讲到过一种叫做 工作消息队列 的消息模型。
在这种模型中,有多个消费者共同绑定了一个队列。队列收到消息以后,会将消息按 轮询 的方式 分配 给每个消费者。
比如一个队列连接了两个消费者(A和B),这时,队列接收到了100条消息,会将第1,3,5,7,.....99条消息分配给消费者A,将第2,4,6,8.....100条消息分配给消费者B。
此时,如果消费者A和B 存在比较大的性能差异 ,很可能A早早消费完了自己的50条消息,而B还有很多消息没有消费完,但是A不能替B消费分配给B的消息,这就造成了资源的浪费。
这是因为,队列的推送策略是轮询,即以一人一条的方式轮换推送。队列默认可以一口气向一个消费者推送250条消息,在消费者确认都消费完以后,会给队列返回一条消息,队列再继续给它推送新的消息。
所以,我们需要使用 能者多劳 模式,改变一次推送的条数。只要设置成每次推送一条,并且需要回复处理完成才会发送下一条,就解决了因为多个消费者性能不均,导致消息分配不合理,进而导致性能浪费的问题。
改变默认的推送条数,只需要在配置文件中添加如下配置:
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
在本案例中,其他的代码实现与之前并无区别。只不过,由于不经过交换机,生产者直接发往队列,所以在消息发送时,只有两个参数,即队列名和消息内容
- 生产者端
/**
* workQueue
* 向队列中不停发送消息,模拟消息堆积。
*/
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 50; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
5. 消息转换器
5.1. 引言
在第一个案例中,我们提到过,生产者发送的消息可以是一个对象。这个对象必须实现序列化接口,在传输的过程中,这个对象会被jdk序列化为一个二进制数组。而众所周知,JDK序列化存在下列问题:
- 数据体积过大
- 有安全漏洞
- 可读性差
下面的图片是生产者发送一个对象后,我们在RabbitMQ控制台捕捉到的消息。可以看到,我们很从消息的载荷(Payload)中捕获到有效信息。
那么,有没有一种方式,既能完成消息的序列化和反序列化,体积小巧,又能保持高可读性呢?很明显JSON就是一个很好的选择
JSON转换器
配置JSON转换器其实非常简单,只需要我们配置一个Bean即可。在配置之前,我们需要先引入jackson依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
之后,我们只需要在任意一个配置类(比如启动类)中加入以下代码
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
需要注意的是,反序列化的时候,要求发送的类和接收的类必须完全一样
,包括字段、类名、包路径
Comments NOTHING