1.消息队列 MQ

1.1. 引言

在了解MQ之前,让我们先了解一下同步通讯和异步通讯。

1.1.1.同步通讯

我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用仓库服务扣减库存出库,调用物流服务准备发货。

  • 同步方法存在下面的问题:
  1. 耦合度高,需要修改原来的代码
  2. 性能下降,调用者需要等待服务提供者响应。如果调用链过长,则响应时间等于每次调用的时间之和。
  3. 资源浪费,调用链中的每个服务在等待响应的过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
  4. 可能导致级联失败。如果服务提供者出现问题,所有调用者都会出现问题。所有调用方都会跟着出问题,如同多米骨牌一样,迅速导致整个微服务群鼓掌。

  • 同步调用的优点:

    • 时效性较强,可以立即得到结果
  • 同步调用的问题:

    • 耦合度高

    • 性能和吞吐能力下降

    • 有额外的资源消耗

    • 有级联失败问题

1.1.2.异步通讯

异步调用则可以避免上述问题:

还是以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用仓库服务扣减库存出库,调用物流服务准备发货。

在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。

订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。

为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。

image-20210422095356088

Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。

好处:

  • 吞吐量提升:无需等待订阅者处理完成,响应更快速

  • 故障隔离:服务没有直接调用,不存在级联失败问题

  • 调用间没有阻塞,不会造成无效的资源占用

  • 耦合度极低,每个服务都可以灵活插拔,可替换

  • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

缺点:

  • 架构复杂了,业务没有明显的流程线,不好管理
  • 需要依赖于Broker的可靠、安全、性能

好在现在开源软件或云平台上 Broker 的软件是非常成熟的,比较常见的一种就是消息队列技术。

1.2.消息队列简介

消息队列(MessageQueue)是一种进程间通信或同一进程的不同线程间的通信方式,利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。消息队列中消息本身一般由消息类型和消息数据组成


目前比较常见的MQ实现有以下几种

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比:

ActiveMQ RabbitMQ RocketMQ Kafka
公司/社区 Apache Rabbit 阿里 Apache
开发语言 Java Erlang Java Scala\&Java
协议支持 OpenWire,STOMP,REST,XMPP,AMQP AMQP,XMPP,SMTP,STOMP 自定义私有协议 自定义私有协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 毫秒级 微秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

2. RabbitMQ

2.1. RabbitMQ简介

rabbitmq\_logo

RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台(OTP)框架上的。能够和所有主要的编程语言进行通讯。


image-20210717162752376

和所有的其他MQ实现一样,从角色的角度来看,RabbitMQ由三部分组成,分别是:

  • 生产者(publisher)
  • 消费者(consumer)
  • rabbitMQ服务(Broker)。

而在Rabbit服务进程是由多个独立且互相隔离的 虚拟主机(virtualHost) 组成。而在虚拟主机内部,则由 交换机(Exchange)消息队列(Queue) 组成。

​ Exchange是消息队列交换机,用于接收消息并将其路由到一个或多个队列。

​ Queue是消息队列,用于存储消息。

2.2. RabbitMQ消息模型

官方消息模型参考文档地址:

https://www.rabbitmq.com/getstarted.html

在上面的官方文档中,官方提供了七种消息模型。我们先对前五种进行讨论,这五种分别是:

  1. 基本消息队列(BasicQueue)

python-one.png (392×59)

  1. 工作消息队列(WorkQueue)

python-two.png (332×111)

  1. 广播(Fanout Exchange)

python-three.png (434×111)

  1. 路由(Direct Exchange)

python-four.png (423×171)

  1. 主题(Topic Exchange)

python-five.png (424×171)

2.2.1. 直接发给队列

可以看到,其中前两种是不经过交换机,生产者直接将消息发送至队列。区别是,基本消息队列模型中,一个消费者对应一个队列。而在工作消息队列中,一个队列可能有多个消费者。

需要注意的是,在RabbitMQ中,消息是 阅后即焚 的,即,一条消息一旦确定被一个消费者消费过了,那这条消息就会从队列中删除。在工作消息队列中,消息并不是同时转发给两个消费者,而是队列按照一定的规则将信息分配给多个消费者,一条消息分配给A以后,就不会给B了。

2.2.1. 发布订阅

在后面的三个消息模型中,生产者都是将消息发送给交换机,然后交换机再根据规则将消息发送给对应的队列。

那么,交换机是如何知道消息应该发给哪个队列呢?

实际上,当交换机和队列被创建以后,我们还需要为交换机创建转发的规则,告诉交换机如何转发消息。而根据交换机的不同,我们又将其分为上面提到的第3\~5种模型。

广播模型 Fanout

image-20210717165438225

在广播模型中,广播交换机与队列建立广播的绑定关系。当生产者将消息推送到广播交换机以后,广播交换机会将消息推送给所有和它建立关系的队列中。

路由模型 Direct

image-20210717170041447

在该模型中,生产者在推送消息时,除了传输数据,指定交换机,还要额外指定一个bindingKey。

在该模型中,交换机可以绑定多个队列,每个队列通过一个bindingKey和交换机绑定。bindingKey是一个字符串,队列在和交换机绑定的时候可以指定多个bindingKey,多个队列也可以指定相同的bindingKey。

当生产者的消息过来时,交换机根据bindingKey来决定将消息给哪些队列。

在该模型中,bindingKey的匹配模式是全匹配,即必须完全一样才认为是匹配成功。

主题模型 Topic

image-20210717170705380

主题模型整体上和路由模型一样,生产者也需要传递bindingKey给交换机,交换机根据bindignKey决定发给哪个队列。

不同的是,在该模式中,交换机和队列建立绑定关系的时候,指定的bindignKey是包含通配符的,如 china.*,#.weather。最后消息过来的时候,交换机也是根据通配符的规则进行消息转发。

通配符规则:

#:匹配一个或多个词

*:匹配不一个词

举例:

item.#:能够匹配item.spu.insert 或者 item.spu

item.*:只能匹配item.spu

2.3. 原生入门案例

这只是一个用于大家理解RabbitMQ的案例,在java中使用原生代码对RabbitMQ进行操作

就像是学习和了解数据库的时候,都是从JDBC学起,虽然我们在开发中并不会写,但是了解其实现原

在接下来的案例中,我们将使用原生的方式实现如下的队列模型:

image-20210717163434647

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

2.3.1. 软件安装

linux端安装

我们更推荐在linux使用docker进行安装。docker的安装和使用在前面的笔记中已经写过。

如果安装好docker后,只需要运行下面的命令即可

docker run \
 -e RABBITMQ_DEFAULT_USER=user \
 -e RABBITMQ_DEFAULT_PASS=123456 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 -d \
 rabbitmq:3-management

在这段命令中,我们使用了官方的镜像,并指定了docker的名字为mq。

指定了初始 用户名/密码 是 user/123456 ,各位可以按需更改。

同时我们将plugins文件夹映射为数据卷,用于以后加载插件。

开放了端口15672,该端口用于登录管理后台,使用 IP:15672 访问。

开放了端口5672,该端口用于其他程序访问MQ。

windows端安装

登录官方网站:https://www.rabbitmq.com/install-windows.html#downloads

找到下面的图片中的内容进行下载,或 ctrl+f 在页面中搜索 Direct Downloads

安装rebbitmq

2.3.2. publisher实现

实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 发送消息
  • 关闭连接和channel

代码实现:

package link.xiaomo.mq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");

        // 5.关闭通道和连接
        channel.close();
        connection.close();

    }
}

2.3.3. consumer实现

代码思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 订阅消息

代码实现:

package link.xiaomo.mq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();

        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

基本消息队列的消息发送流程:

  1. 建立connection

  2. 创建channel

  3. 利用channel声明队列

  4. 利用channel向队列发送消息

基本消息队列的消息接收流程:

  1. 建立connection

  2. 创建channel

  3. 利用channel声明队列

  4. 定义consumer的消费行为handleDelivery()

  5. 利用channel将消费者与队列绑定

如人饮水,冷暖自知。
最后更新于 2023-08-02