1. 简介

在上一篇中,我们在介绍RabbitMQ的时候就提到过,RabbitMQ 支持 AMQP,XMPP,SMTP,STOMP协议

协议 是一种规范,是用于规定如何将消息从一方传给另一方的。正如TCP协议一样,消息队列协议本身也并不关心消息如何构建,也不关心消息的请求、获取、存储方式等。具体协议的实现方式是多样的,可以由各种语言,各种方式实现。

AMQP 正是RabbitMQ支持的一种消息队列协议。而Spring为我们提供了一套基于AMQP协议的API规范,就是SpringAMQP。SpringAMQP利用了SpringBoot对其实现了自动装配,并且提供了一个用来接收和发送消息的模板。它由两部分组成:spring-amqp 是基本抽象,spring-rabbit 底层由RabbitMQ实现


SpringAMQP主要提供了三个功能:

  • 自动声明队列、交换机及其绑定关系
  • 基于注解的监听器模式,异步接收消息
  • 封装了RabbitTemplate工具,用于发送消息

2. 案例1:通过Bean声明

2.1. 最小配置

为了项目能够正常使用SpringAMQP,你应该至少进行如下的最小配置。

  • 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • application.yml (生产者和消费者端都要配置)
spring:
  rabbitmq:
    host: 192.168.150.101 # 主机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机,默认为/
    username: user # 用户名
    password: 123456 # 密码

2.2. 代码实现

image-20210717165438225

在接下来的案例中,我们编写一个简单的 广播模型


消费者端

在使用java操作RabbitMQ之前,我们需要通过java程序 创建交换机创建队列创建绑定关系

创建交换机和队列一般有两种方式

一种是使用下面代码中的方法,通过Bean声明的方式构建对应的bean,之后amqp会自动创建对应的交换机、队列和绑定关系。

另一种方式是在消费者创建监听的时候,通过注解声明的方式创建,这种方式我们会在下一个案例中讲解。

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfig {
    /**
     * 声明交换机
     * @return Fanout类型交换机
     */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout.exchange");
    }

    /** 
     * 声明队列 queue1
     */
    @Bean
    public Queue fanoutQueue1(){
        return new Queue("fanout.queue1");
    }

    /**
     * 绑定交换机和queue1
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    }

    /**
     * 声明队列 queue2
     */
    @Bean
    public Queue fanoutQueue2(){
        return new Queue("fanout.queue2");
    }

    /**
     * 绑定交换机和queue2
     */
    @Bean
    public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    }
}

上面只是声明RabbitMQ中的内容,接下来我们需要建立一个监听方法,指定监听的队列,并与RabbitMQ建立连接。当队列有消息发过来的时候,监听方法进行接收消息并进行下一步的处理。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SpringRabbitListener {

    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }

    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }
}

生产者端

消费者端的操作就很简单了,只需要在需要发消息的类中注入RabbitTemplate,然后调用方法即可发送消息。由于我们当前的例子是广播,所以不需要传bindingKey参数。

对于传递的消息参数,可以是纯字符串,也可以是一个对象,如果是对象,那对象必须实现Serializable接口。对象在传输的时候,会被RabbitMQ转换成字节数组传输,在接收端重新反序列化成对象。

public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "fanout.exchange";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

3. 案例2:通过注解声明

3.1. 最小配置

为了项目能够正常使用SpringAMQP,你应该至少进行如下的最小配置。

  • 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • application.yml (生产者和消费者端都要配置)
spring:
  rabbitmq:
    host: 192.168.150.101 # 主机IP
    port: 5672 # 端口
    virtual-host: / # 虚拟主机,默认为/
    username: user # 用户名
    password: 123456 # 密码

3.2. 代码实现

image-20210717170223317

接下来,我们将使用注解声明交换机和队列的方式,实现一个 路由模型 的案例。


在这个模型中,direct.queue1 和交换机通过 bluered 这两个bindingKey进行绑定.

direct.queue2 和交换机通过 yellowred 这两个bindingKey进行绑定.

如果 生产者direct.exchange 交换机发送一条bindingKey为 red 的消息,则交换机会将消息发送给两个队列,因为两个队列的绑定关系中都包含 red

如果 生产者direct.exchange 交换机 送一条bindingKey为 yellow 的消息,则交换机会将消息发送给 direct.queue2,因为绑定的时候只有direct.queue2绑定了 yellow

消费者端

在该案例中,我们在通过注解,在消费者端的监听方法上,创建交换机、队列和绑定关系。

在第一个监听方法的@RabbitListener注解中,通过@Queue注解声明(创建)一个名字为direct.queue1的队列。

通过@Exchange注解声明一个交换机,交换机的名字为direct.exchange,交换机的类型为direct。将交换机和上面创建的队列进行绑定,参数key为绑定的bindingKey。

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SpringRabbitListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg) {
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg) {
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }
}

生产者端

消费者端的操作与上面的广播相似,只需要在需要发消息的类中注入RabbitTemplate,然后调用方法即可发送消息。

在路由模型中,需要传一个bindingKey参数,交换机根据bindingKey进行匹配,决定发给哪个队列。

public void testFanoutExchange() {
    // 队列名称
    String exchangeName = "fanout.exchange";
    // 消息
    String message = "hello, everyone!";
    rabbitTemplate.convertAndSend(exchangeName, "", message);
}

4. 能者多劳

在上一篇笔记中,我们讲到过一种叫做 工作消息队列 的消息模型。

在这种模型中,有多个消费者共同绑定了一个队列。队列收到消息以后,会将消息按 轮询 的方式 分配 给每个消费者。

image-20210717164238910

比如一个队列连接了两个消费者(A和B),这时,队列接收到了100条消息,会将第1,3,5,7,.....99条消息分配给消费者A,将第2,4,6,8.....100条消息分配给消费者B。

此时,如果消费者A和B 存在比较大的性能差异 ,很可能A早早消费完了自己的50条消息,而B还有很多消息没有消费完,但是A不能替B消费分配给B的消息,这就造成了资源的浪费。

这是因为,队列的推送策略是轮询,即以一人一条的方式轮换推送。队列默认可以一口气向一个消费者推送250条消息,在消费者确认都消费完以后,会给队列返回一条消息,队列再继续给它推送新的消息。

所以,我们需要使用 能者多劳 模式,改变一次推送的条数。只要设置成每次推送一条,并且需要回复处理完成才会发送下一条,就解决了因为多个消费者性能不均,导致消息分配不合理,进而导致性能浪费的问题。

改变默认的推送条数,只需要在配置文件中添加如下配置:

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

在本案例中,其他的代码实现与之前并无区别。只不过,由于不经过交换机,生产者直接发往队列,所以在消息发送时,只有两个参数,即队列名和消息内容

  • 生产者端
/**
 * workQueue
 * 向队列中不停发送消息,模拟消息堆积。
 */
@Test
public void testWorkQueue() throws InterruptedException {
    // 队列名称
    String queueName = "simple.queue";
    // 消息
    String message = "hello, message_";
    for (int i = 0; i < 50; i++) {
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    }
}

5. 消息转换器

5.1. 引言

在第一个案例中,我们提到过,生产者发送的消息可以是一个对象。这个对象必须实现序列化接口,在传输的过程中,这个对象会被jdk序列化为一个二进制数组。而众所周知,JDK序列化存在下列问题:

  • 数据体积过大
  • 有安全漏洞
  • 可读性差

下面的图片是生产者发送一个对象后,我们在RabbitMQ控制台捕捉到的消息。可以看到,我们很从消息的载荷(Payload)中捕获到有效信息。

image-20210422232835363

那么,有没有一种方式,既能完成消息的序列化和反序列化,体积小巧,又能保持高可读性呢?很明显JSON就是一个很好的选择

JSON转换器

配置JSON转换器其实非常简单,只需要我们配置一个Bean即可。在配置之前,我们需要先引入jackson依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

之后,我们只需要在任意一个配置类(比如启动类)中加入以下代码

@Bean
public MessageConverter jsonMessageConverter(){
    return new Jackson2JsonMessageConverter();
}

需要注意的是,反序列化的时候,要求发送的类和接收的类必须完全一样,包括字段、类名、包路径

如人饮水,冷暖自知。
最后更新于 2023-08-02