消息中间件之RabbitMQ
一、引言
1.什么是消息中间件?
在早期的计算机通信中,都是采用点对点通信,并且两边要使用相同的协议,这无疑是强耦合且低可靠的,于是出现了消息中间件——位于通信双方中间的组件
官方点说,消息中间件是一种软件架构模式,用于处理分布式系统中的异步通信和数据传输。它是一种基于消息传递的模式,用于解耦不同应用程序或系统之间的通信,以提高系统的可靠性、可扩展性和可维护性
那么消息中间件具体有什么用呢?简单点说就是异步、削峰、解耦
- 异步:当系统中需要处理大量的请求时,可以将请求发送到消息队列中,而不是直接调用后端服务。后端服务从消息队列中读取请求,并通过异步方式进行处理,从而提高系统的性能和吞吐量。同时,异步方式还可以避免由于请求处理时间过长而导致的请求积压问题,提高系统的响应速度。
- 削峰:当系统面临高并发请求时,可以通过消息队列来削峰。具体来说,可以将请求发送到消息队列中,然后由后端服务从消息队列中读取请求进行处理。由于消息队列可以缓存大量的请求,因此可以有效地减少请求的到达率,从而保护系统不被过载。
- 解耦:当系统中不同的服务之间需要进行通信时,可以使用消息队列来解耦。具体来说,可以将消息发送到消息队列中,然后由不同的服务从消息队列中读取消息进行处理。这样可以将不同服务之间的通信变为异步方式,避免因为服务之间的耦合而导致的系统复杂性增加
此外,MQ还支持广播等方式进行通信
2.带来的问题
MQ虽好,但是也不能滥用,因为一个系统中引入新的组件,肯定会导致系统更加复杂等问题,具体如下:
运维成本增加:既然要用 MQ,必须要分配资源部署 MQ,还要保证它时刻正常运行
系统可用性降低:原来是两个节点的通信,现在还需要独立运行一个服务。虽然一般的MQ都有很高的可靠性和低延迟的特性,但是一旦网络或者 MQ 服务器出现问题,就会导致请求失败,严重地影响业务
系统复杂性提高:为什么说复杂?作为开发人员,要使用 MQ,首先必须要理解相关的模型和概念,才能正确地配置和使用 M;。其次,使用 MQ 发送消息必须要考虑消息丢失和消息重复消费的问题。一 旦消息没有被正确地消费,就会带来数据一致性的问题
3.消息协议
协议是计算机之间通信时共同遵从的一组约定,都遵守相同的约定,计算机之间才能相互交流,是对数据格式和计算机之间交换数据时必须遵守的规则的正式描述
协议三要素:
- 语法:即数据与控制信息的结构或格式
- 语义:即需要发出何种控制信息,完成何种动作以及做出何种响应
- 时序(同步): 即事件实现顺序的详细说明
3.1 AMQP协议
AMQP(Advanced Message Queuing Protocol)是高级消息队列协议 04 年 JPMorgan Chase(摩根大通集团)联合其他公司共同设计
特性: 事务支持、持久化支持,出生金融行业,在可靠性消息处理上具备天然的优势
应用:RabbitMQ、ActiveMQ、OpenAMQ、Apache Qpid、Redhat Enterprise MRG、AMQP Infrastructure、ØMQ、Zyre
3.2 MQTT协议
MQTT(Message Queuing Telemetry Transport)消息队列遥测传输是 IBM 开发的一个即时通讯协议, 物联网系统架构中的重要组成部分
特性:轻量、结构简单、传输快、没有事务支持、没有持久化相关设计。
应用:RabbitMQ、ActiveMQ
3.3 Open Message 协议
Open Messaging 是近一两年由阿里发起,与雅虎、滴滴出行、Streamlio 等公司共同参与创立的分布 式消息中间件、流处理领域的应用开发标准,是国内首个在全球范围内发起的分布式消息领域国际标准
特性:结构简单、解析快、有事务设计、有持久化设计
应用:RocketMQ
3.4 Kafka协议
Kafka协议是基于TCP 的二进制协议。消息内部是通过长度来分隔,由一些基本数据类型组成。 特性:结构简单、解析快、无事务设计、有持久化设计
应用:Kafka
4.持久化
即将数据存入磁盘,永久保存的过程
ActivityMQ、RabbitMQ、Kafka、RocketMQ均支持持久化
5.消息分发策略
| ActivityMQ | RabbitMQ | Kafka | RocketMQ | |
|---|---|---|---|---|
| 发布订阅 | 支持 | 支持 | 支持 | 支持 |
| 轮询分发 | 支持 | 支持 | 支持 | / |
| 公平分发 | / | 支持 | 支持 | / |
| 重发 | 支持 | 支持 | / | 支持 |
| 消息拉取 | / | 支持 | 支持 | 支持 |
二、RabbitMQ基础
1.简介
RabbitMQ是一个开源的、高性能、跨平台的消息队列系统,它实现了高可用性、可扩展性和可插拔性。RabbitMQ最初是由LShift公司开发的AMQP(高级消息队列协议)的实现,现已成为了开源社区的一个重要项目
RabbitMQ的核心思想是生产者将消息发送到队列中,消费者从队列中获取消息并进行处理。RabbitMQ支持多种协议,如AMQP、STOMP、MQTT等,可以通过不同的协议来连接不同的客户端和服务端
需要注意的是,RabbitMQ采用erlang语言开发,限制了它的开放性—因为看不懂
2.架构原理
话不多说,上图
2.1 Broker
要使用 RabbitMQ 来收发消息,必须要安装一个 RabbitMQ 的服务,可以安装在 Windows 上面也可以安装在 Linux 上面,默认是 5672 的端口。这台 RabbitMQ 的服务器我们把它叫做 Broker,中文翻译是代理/中介,因为 MQ 服务器帮助我们做的事情就是存储、转发消息
2.2 Connection 连接
无论是生产者发送消息,还是消费者接收消息,都必须要跟 Broker 之间建立一个 连接,这个连接是一个 TCP 的长连接
2.3 Channel 通道
如果所有的生产者发送消息和消费者接收消息,都直接创建和释放 TCP 长连接的话,对于 Broker 来说肯定会造成很大的性能损耗,也会浪费时间
所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接,把它翻译成通道,或者消息信道。这样我们就可以在保持的 TCP 长连接里面去创建和释放 Channel,大大了减少了资源消耗
不同的 Channel 是相互隔离的,每个 Channel 都有自己的编号。对于每个客户端线程来说,Channel 就没必要共享了,各自用自己的 Channel。 另外一个需要注意的是,Channel 是 RabbitMQ 原生 API 里面的最重要的编程接口,也就是说定义交换机、队列、绑定关系,发送消息,消费消息,调用的都是 Channel 接口上的方法
2.4 Queue 队列
连接到 Broker 以后,就可以收发消息了
在 Broker 上有一个对象用来存储消息,在 RabbitMQ 里面这个对象叫做 Queue。 实际上RabbitMQ 是用数据库来存储消息的,这个数据库跟 RabbitMQ 一样是用 Erlang 开发的,名字叫 Mnesia,可以在磁盘上找到 Mnesia 的存储路径
Windows 系统保存在用户目录下:
- C:\Users\用户名\AppData\Roaming\RabbitMQ\db\rabbit@用户名-mnesia
CentOS 保存在/var/lib 目录下:
- /var/lib/rabbitmq/mnesia
队列也是生产者和消费者的纽带,生产者发送的消息到达队列,在队列中存储。 消费者从队列消费消息
2.5 Consumer 消费者
消费者,顾名思义,就是消费消息的角色
消费者如何从消息队列中获取消息呢?要么被推送,要么自己拉取呗
- Pull模式:对应的方法是 basicGet,消费者自己获取,实时性低一点,但是能根据自己消费能力获取
- Push模式:对应的方法是 basicConsume,消息队列主动推送给消费者,实时性高,但是消费不过来可能造成消息积压
RabbitMQ 中 pull 和 push 都有实现。而 kafka 和 RocketMQ 只有 pull,消费完后再手动提交偏移量,这样可以确保消息处理的顺序和可靠性
2.6 Exchange 交换机
此处思考一个问题,如果要把一条消息发送给多个队列,被多个消费者消费,应该怎么做?
此时,生产者要调用多次 basicPublish 的方法,依次发送给多个队列
这样的话,像消息推送的这种场景,有成千上万个队列的时候,对生产者来说压力太大了
这个问题,每个消息队列都会面对,那么RabbitMQ是如何解决的呢?
在RabbitMQ中,使用了一个叫做Exchange的组件解决这个问题
也就是生产者只需要将消息发送给Exchange即可,具体分发到哪个Queue,由Exchange来完成
那么Exchange是如何和Queue绑定关系的呢?
事实上,Exchange和Queue的关系由Exchange的类型来决定,下面会详解
2.7 Vhost 虚拟机
为何会出现虚拟机呢?
设想,如果某个业务系统想要有自己独立的Broker,那么要如何实现呢?总不能再配置一台机制吧?
因此,RabbitMQ使用了虚拟主机Vhost来解决这个问题
VHOST 除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控制。它的作用类似于其他编程语言中的 namespace 和 package,不同的 VHOST 中可以有同名的 Exchange 和 Queue,它们是完全透明的
3.消息分发机制
RabbitMQ中有四种类型的交换机,分别是Direct、Topic、Fanout和Headers,其中,Headers不常用
这些交换机就指定了消息如何传递给Queue
3.1 Direct 直连
一个队列与直连类型的交换机绑定,需指定一个明确的绑定键(binding key)
生产者发送消息时会携带一个路由键(routing key)
当消息的路由键与某个队列的绑定键完全匹配时,这条消息才会从交换机路由到这个队列上
多个队列也可以使用相同的绑定键,如下
此时,如果使用如下语句进行发送,则只有第一个队列能收到消息
1 | channel.basicPublish(“MY_DIRECT_EXCHANGE”,”spring”,”msg 1”); |
适用场景:业务用途明确的消息,比如 HR 系统跟销售系统之间通信,传输的是销售系统专用的消息,就可以建一个直连类型的交换机,使用明确的绑定键
3.2 Topic 主题
直连模式只能一对一,那如果想要一对多的情况下如何实现呢?
此时就可以用Topic模式,它在绑定键中使用通配符进行匹配,支持两个通配符
- “#” 代表匹配0个或者多个单词
- “*” 代表匹配一个单词
单词(word)指使用点隔开的字符,如a.bc.def是三个单词
具体举例如下
如上,如果使用以下语句,则能发送到第2和第3个队列
1 | channel.basicPublish("MY_TOPIC_EXCHANGE","senior.netty", "msg 3"); |
适用场景:适用一些根据业务主题或者消息等级过滤消息的场景,比如说一条消息可能既跟资金有关,又跟风控有关,那就可以让这个消息指定一个多级的路由键
3.3 FanOut 广播
广播也是一对多,但是不用指定绑定键,因为它直接发给了所有队列
比如
1 | channel.basicPublish("MY_FANOUT_EXCHANGE", "", "msg 4"); |
会发给所有队列
适用场景:适合通用的业务消息
4.持久化与内存管理
4.1 持久化机制
持久化就是将数据从内存写入磁盘,这样当系统重启后数据也能够重新读取
消息的持久化与非持久化可以通过属性设置
为了将消息设置为持久化的,需要在生产者发送消息的时候设置消息的 deliveryMode 属性为2,这样消息就会被标记为持久化消息,例如:
1 | channel.basicPublish("", "queue-name", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); |
MessageProperties.PERSISTENT_TEXT_PLAIN即为2
如果消息不需要被持久化,可以将 deliveryMode 属性设置为 1,或者在发送消息时不设置 deliveryMode 属性,例如:
1 | channel.basicPublish("", "queue-name", null, message.getBytes()); |
标记为持久化消息和非持久化消息有何区别呢?
持久化消息:当 RabbitMQ 收到消息时,会将消息储存在内存中,同时也会写入磁盘
非持久化消息:只会存在内存中
- 当内存使用达到 RabbitMQ 的临界值时,内存中的数据会被交换到磁盘,持久化消息由于本就存在于磁盘中,不会被重复写入
由此可见,并不是说非持久化消息就不会进入磁盘,它可以进入磁盘,只是重启后还是会消失
从上面的例子可知,消息的持久化是在发消息时,通过 deliveryMode属性设置
此外,交换器也可以通过参数持久化,而非持久化的消息、队列、交换器在服务重启后会消失,即使已经被写 入磁盘
4.2 内存控制
RabbitMQ 中通过内存阈值参数控制内存的使用量,当内存使用超过配置的阈值时,RabbitMQ 会阻塞客户端的连接并停止接收从客户端发来的消息,以免服务崩溃, 同时,会发出内存告警,此时客户端于与服务端的心跳检测也会失效
当出现内存告警时,可以通过管理命令临时调整
1 | rabbitmqctl set_vm_memory_high_watermark <fraction> |
fraction 为内存阈值,默认是 0.4,表示 RabbitMQ 使用的内存超过系统内存的 40%时,会产生内存告警,通过此命令修改的阈值在重启后会失效
可以通过修改配置文件的方式,使之永久生效,但是需要重启服务
1 | # rabbit.conf |
提供 relative 与 absolute 两种配置方式
- relative:相对值,也就是前面的fraction参数,建议0.4~0.66,不能太大
- absolute:绝对值,固定大小,单位为 KB、MB、GB
4.3 内存换页
在 RabbitMQ 达到内存阈值并阻塞生产者之前,会尝试将内存中的消息换页到磁 盘,以释放内存空间。内存换页由换页参数控制,默认为 0.5,表示当内存使用量达到 内存阈值的 50%时会进行换页,也就是 0.4*0.5=0.2
1 | vm_memory_high_watermark_paging_ratio=0.5 |
当阈值大于1时,相当于禁用了换页功能
4.4 磁盘控制
RabbitMQ 通过磁盘阈值参数控制磁盘的使用量,当磁盘剩余空间小于磁盘阈值 时,RabbitMQ 同样会阻塞生产者,避免磁盘空间耗尽
磁盘阈值默认 50M,由于是定时检测磁盘空间,不能完全消除因磁盘耗尽而导致崩溃的可能性,比如在两次检测之间,磁盘空间从大于 50M 变为 0M
一种相对谨慎的做法是将磁盘阈值大小设置与内存相等
1 | rabbitmqctl set_disk_free_limit <limit> |
5.插件管理
RabbitMQ 的插件是一种用于扩展和定制 RabbitMQ 功能的方式。它们可以添加额外的功能、协议支持、认证后端、集成、监视和工具等
5.1 插件列表管理
rabbitmq-plugins list
此命令,列出当前可以安装使用的插件。插件前面[ ] 为空说明,没有安装。有 e* 说明插件是安装了的
5.2 插件安装
以 rabbitmq_management为例
rabbitmq-plugins enable rabbitmq_management
此命令,就是安装启用管理
5.3 插件卸载
将上面的enable改为disable即可
rabbitmq-plugins disable rabbitmq_management
三、死信与限流
1.订单延时关闭问题
先来一个经典的面试问题,假如设计一个功能,超过30min未支付的订单要取消,自动关闭,如何实现呢?
不采用MQ的话,有以下方案:
- 定时器:创建订单时就创建一个定时器,到30min就查看订单是否被支付
- 轮询:有一个线程定时执行检查订单创建时间和支付状态
事实上,上面两种方法都有缺点,那么采用MQ的话,如何实现呢?会有何进步?
在这种情况下,可以使用RabbitMQ的死信队列(Dead Letter Queue)来实现功能
主要借助RabbitMQ的消息存活时间属性
2.消息存活时间 TTL(Time To Live)
下面分别是通过队列和消息设置过期时间
2.1 Queue属性设置
在队列中,可以设置一个消息过期属性x-message-ttl,一旦消息超过了这个时间还未被消费,则会被丢弃
该属性可以针对队列设置,也可以对某消息进行设置
2.2 Message 属性设置
消息的属性可以在发送时,使用MessageProperties来执行,使用方法如下
1 | MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("4000"); // 消息的过期属性,单位 ms |
假如队列和消息都设置了过期时间,那肯定是时间小的那个生效
上面为了简便,说设置了过期时间的消息会被丢弃,事实上过期的消息也不能被直接丢弃,因为丢弃了就无法找回了,也就不可能再被消费了,RabbitMQ是将消息丢到一个容器里面,这个容器就是死信队列
3.死信
如果不进行配置,消息过期后是真的会被丢弃的
可以通过配置,让过期的消息变成死信,从而进入死信队列进行存储
3.1 死信交换机 DLX / 死信队列 DLQ
为什么将这两个东西放到一起说呢?
因为前面介绍过,除了广播模式,消息是通过交换机Exchange来定位队列的,将所有队列都设置成死信队列也不太可能也没有意义,所以需要设置死信交换机,从而指定存储死信的队列,也就是死信队列
注意,死信队列和死信交换机也都是普通的队列和交换机,只是承担的职责不同罢了
队列在创建的时候可以指定一个死信交换机 DLX(Dead Letter Exchange)。死信交换机绑定的队列被称为死信队列 DLQ(Dead Letter Queue)
那么消息过期后,就会先找指定的DLX,再通过DLX去找DLQ,消费者可以去消费这个DLQ
3.2 死信队列实现方案
下面使用一个例子来用死信队列实现订单延时关闭功能,步骤如下
- 声明原交换机,原队列,进行绑定
- 再指定原队列的死信交换机—过期时发送
- 声明死信交换机,死信队列,使用“#”进行绑定,代表无条件路由
- 消费者监听死信队列,检查订单逻辑
- 生产者发送消息测试,10s过期
上述步骤,用代码实现如下
1 | // 1.指定队列的死信交换机 |
具体流转如下图
即生产者—>原交换机—>原队列—>超时—>死信交换机—>死信队列—>消费者
3.3 其他方案
使用死信队列实现延时消息的缺点:
1)如果统一用队列来设置消息的 TTL,当梯度非常多的情况下,比如 1 分钟,2 分钟,5 分钟,10 分钟,20 分钟,30 分钟……需要创建很多交换机和队列来路由消息。
2)如果单独设置消息的 TTL,则可能会造成队列中的消息阻塞——前一条消息没有出队(没有被消费),后面的消息无法投递(比如第一条消息过期 TTL 是 30min, 第二条消息 TTL 是10min。10 分钟后,即使第二条消息应该投递了,但是由于第一条消息还未出队,所以无法投递)。
3)可能存在一定的时间误差
因此,在RabbitMQ 3.5.7之后提供插件rabbitmq-delayed-message-exchange实现延时队列的功能,使用方法不再赘述
3.4 思考
除了消息过期,还有什么情况消息会变成死信?
1)消息被消费者拒绝并且未设置重回队列:(NACK || Reject ) && requeue == false
2)队列达到最大长度,超过了 Max length(消息数)或者 Max length bytes(字节数),最先入队的消息会被发送到 DLX
解释:ACK:acknowledge 消息确认,NACK:Un acknowledge 没有消息确认
已知,RabbitMQ 的消息是存在磁盘上的,如果是内存节点,会同时存在磁盘和内存中。当 RabbitMQ 生产 MQ 消息的速度远大于消费消息的速度时,会产生大量的消息堆积,占用系统资源,导致机器的性能下降
那么想要控制服务端接收的消息的数量,应该怎么做呢?
从RabbitMQ的结构来看,流量控制可以从两个方面来控制
一个是服务端,一个是消费端
4.服务端流控
首先明确下问题,即为什么要进行流控?
因为消息积压,不加限制可能会内存/磁盘爆炸
所以要限制消息的无限存储,那么如何限制呢?最简单的就是淘汰
4.1 队列长度
消息队列有两个属性可以控制长度,一个是控制数量,一个是控制内存
- x-max-length:队列中最大存储最大消息数,超过这个数量,队头的消息会被丢弃
- x-max-length-bytes:队列中存储的最大消息容量(单位 bytes),超过这个 容量,队头的消息会被丢弃
消息长度只在消息堆积的情况下有意义,而且这种方式是简单粗暴的删除了先入队的消息,并不是真正的实现了服务端限流
4.2 内存控制
前面说过,RabbitMQ有一个内存告警机制,可以设置一个阈值,达到阈值就会抛出警告并且阻塞所有连接—正好可以用来限流
所以可以通过内存阈值来进行消息流量的控制
1 | rabbitmqctl set_vm_memory_high_watermark 0.6 |
设置成0的话,所有消息都不能发布
4.3 磁盘控制
既然内存可以存储消息,通过内存可以进行流量控制,那么同样存储消息的磁盘,当然可以通过磁盘容量来进行相似的控制了
可以设置当磁盘容量低于指定值时触发流控措施,同样有比例和固定值两种方式
1 | disk_free_limit.relative = 3.0 # 30% |
此外,如果Broker还可以存,但是消费者消费不过来了,积压太多,也要限流
5.消费端限流
默认情况下,如果不进行配置,RabbitMQ 会尽可能快速地把队列中的消息发送到消费者
但是因为消费者会在本地缓存消息,如果消息数量过多,可能会导致 OOM 或 者影响其他进程的正常运行
在消费者处理消息的能力有限,例如消费者数量太少,或者单条消息的处理时间 过长的情况下,如果我们希望在一定数量的消息消费完之前,不再推送消息过来,就要用到消费端的流量限制措施
可以基于 Consumer 或者 channel 设置 prefetch count 的值,含义为 Consumer 端的最大的 unacked messages 数目(即未确认)。当超过这个数值的消息未被确认,RabbitMQ 会停止投递新的消息给该消费者
1 | channel.basicQos(2); // 如果超过 2 条消息没有发送 ACK,当前消费者不再接受队列消息 |
四、高可用集群
1.介绍
为啥要用集群呢?因为高可用啊,集群主要用于实现高可用与负载均衡
高可用:如果集群中的某些 MQ 服务器不可用,客户端还可以连接到其他 MQ 服务器。不至于影响业务
负载均衡:在高并发的场景下,单台 MQ 服务器能处理的消息有限,可以分发给多台 MQ 服务器。减少消息延迟
2.RabbitMQ集群
RabbitMQ采用Erlang语言编写,而该语言天生具备分布式的特性,因此RabbitMQ也是天然支持集群的,并不需要通过ZK来实现数据同步
那么它是如何保持通信并且验证身份的呢?
首先,肯定是要为每个节点配置需要建立联系的其他节点
建立联系时需要身份验证,在这里采用erlang.cookie文件进行身份验证,每个节点上该文件保持一致
服务的端口是 5672,UI 的端口是 15672,集群的端口是 25672
集群通过 25672 端口两两通信,需要开放防火墙的端口。 需要注意的是,RabbitMQ 集群无法搭建在广域网上,除非使用 federation 或者shovel 等插件(没这个必要,在同一个机房做集群)
3.RabbitMQ节点类型
集群有两种节点类型,一种是磁盘节点(Disc Node),一种是内存节点(RAM Node)
磁盘节点(disc):将元数据(包括队列名字属性、交换机的类型名字属性、绑定、vhost)放在磁盘中
未指定类型的情况下,默认为磁盘节点
服务重启之后,存在磁盘节点中的数据还是会存在,所以像我们的持久化消息、持久 化队列等,都会放置硬盘节点保存
内存节点(ram):就是将元数据都放在内存里,内存节点的话,只要服务重启,该节点的所有数据将会丢失
以在 RabbitMQ 集群里,至少有一个磁盘节点,它用来持久保存我们的元数据,如果RabbitMQ 是单节点运行,则默认就是磁盘节点。但是为了提高性能,其实不需要所有节点都是 disc 的节点,根据需求分配即可
元数据
- 队列元数据:包括队列的名称、持久性、最大长度、消息数目等信息。在RabbitMQ中,队列是消息传递的中心,因此队列元数据是非常重要的。
- 交换机元数据:包括交换机的名称、类型、持久性、绑定的队列、路由键等信息。交换机用于消息的路由和分发,因此交换机元数据对于消息传递非常重要。
- 绑定元数据:包括绑定的队列、交换机、路由键等信息。绑定将队列和交换机连接起来,用于消息的路由和分发。
- 连接元数据:包括连接的IP地址、端口号、协议、连接数、用户等信息。连接是RabbitMQ客户端与服务端之间的通信通道。
- 用户元数据:包括用户的名称、密码、权限等信息。RabbitMQ使用用户认证来控制对队列、交换机和其他资源的访问权限。
总之,RabbitMQ的元数据提供了关于队列、交换机、绑定、连接和用户的重要信息,帮助用户管理和控制消息传递
如果 RabbitMQ 集群只有一个磁盘节点,然后磁盘节点挂了,会发生什么?
可以正常的投递消息和消费消息,但是不能做以下事:
- 不能创建队列
- 不能创建交换机
- 不能创建用户绑定关系
- 不能修改用户权限
为什么不能做这些事情呢?
因为这些事情的实现都需要把相关信息放到元数据里,元数据需要在磁盘上持久化的
所以,考虑到高可用性,推荐在集群里保持 2 个磁盘节点,这样一个挂了,另一个还可正常工作。但上述最后一点,往集群里增加或删除节点,要求 2 个磁盘节点同时在线
那么集群如何配置呢?
- 配置 hosts 以便相互通信
- 同步 erlang.cookie
- 加入集群(join cluster 命令)
RabbitMQ 有两种集群模式:普通集群模式和镜像队列模式
4.普通集群
普通集群模式下,不同的节点之间只会相互同步元数据(交换机、队列、绑定关系、Vhost 的定义),而不会同步消息
如下图
那么这种模式有什么作用呢?
这样无论生产者连接的是哪个节点,都能够找到需要存储的位置,比如上面连接的是C,需要将消息存到A上,那么它就能通过C找到A并把消息存到A的消息队列
好处是生产者连到哪个节点都能发送成功,缺点呢?
缺点也很明显,A挂了之后就不能存到A了,消息直接没地方存,A的消息也可能丢失
当初之所以这样设计,主要是出于存储和同步数据的网络开销的考虑,如果所有节点都存储相同的数据, 就无法达到线性地增加性能和存储容量的目的(堆机器)
但是这种方式明显无法达到高可用的作用,因此就出现了镜像集群
5.镜像集群
镜像集群是什么结构呢?看下图
观察图,会发现在普通模式基础上,镜像模式还同步了消息内容到其他节点
镜像队列模式下,消息内容会在镜像节点间同步,可用性更高。不过也有一定的副作用,系统性能会降低,节点过多的情况下同步的代价比较大
总结下就是,普通集群为了节省网络和内存等的消耗,只同步了元数据信息
而镜像集群为了高可用,将消息也同步过去了,这样一台机器挂了其他机器还能接手
6.高可用实现原理
首先是连接问题,集群模式下,会存在很多机器节点,那客户端要连接到哪个呢?如果都连接到同一台机器,对这台机器的负担无疑是很大的
此时就要用到负载均衡组件(例如 HAProxy,LVS,Nignx)了,由负载均衡组件做路由
而客户端只需要连接到负载均衡组件的IP即可
而负载均衡根据组件不同,实现的原理也不尽相同,主要分为四层负载和七层负载
四层负载:工作在 OSI 模型的第四层,即传输层(TCP 位于第四层),它是根据 IP 端口进行转发(LVS 支持四层负载)。RabbitMQ 是 TCP 的 5672 端口
七层负载:工作在 OSI 模型的第七层,即应用层(HTTP 位于第七层),可以根据请求资源类型分配到后端服务器(Nginx 支持七层负载;HAProxy 支持四层和七层负载)
但是就像引入MQ使得系统更复杂,会考虑MQ挂掉怎么办,如果负载均衡的组件挂了怎么办?
此时需要一个具有以下功能的组件
- 它本身有路由(负载)功能,可以监控集群中节点的状态(比如监控 HAProxy), 如果某个节点出现异常或者发生故障,就把它剔除掉
- 为了提高可用性,它也可以部署多个服务,但是只有一个自动选举出来的 MASTER 服务器(叫做主路由器),通过广播心跳消息实现
- MASTER 服务器对外提供一个虚拟 IP,提供各种网络功能。也就是谁抢占到 VIP, 就由谁对外提供网络服务。应用端只需要连接到这一个 IP 就行了
事实上,确实有这么一个组件,它基于 VRRP 协议(虚拟路由冗余协议 Virtual Router Redundancy Protocol)
这个组件就是 Keepalived,它具有 Load Balance 和 High Availability 的功能
不止RabbitMQ,事实上,MySQL、MyCat、Redis集群实现方式这部分都类似
7.基于 HAproxy+Keepalived 搭建高可用
在介绍案例之前,先看下 HAproxy+Keepalived是如何配合使用的
HAproxy 是一种负载均衡器,可以将客户端请求均衡地分配到不同的 RabbitMQ 节点上,从而实现负载均衡
而 Keepalived 则可以将多台服务器虚拟成一个虚拟 IP 地址(VIP),并通过心跳检测来实现服务器之间的故障转移。当某个 RabbitMQ 节点宕机或出现故障时,Keepalived 会自动将 VIP 切换到另一个可用的 RabbitMQ 节点上,从而确保 RabbitMQ 集群的高可用性和可靠性
在这种情况下,应用程序需要连接到虚拟 IP 地址(VIP),而不是直接连接到 RabbitMQ 节点。当 VIP 切换到另一个 RabbitMQ 节点上时,应用程序会自动重新连接到新的节点,从而保证服务的连续性和可用性
总结就是
用户——VIP
VIP——HAproxy
HAproxy——RabbitMQ节点
Keepalived则肩负着检测哪些节点可用的功能
下面看个案例,如下是结构图
这里声明一下,为了高可用,HAproxy不可以跟RabbitMQ服务装在同一台机器上
先看看ip规划:
- 内存节点 1:192.168.8.144
- 内存节点 2:192.168.8.145
- 磁盘节点:192.168.8.146
- VIP:192.168.8.149
具体的,
- 这里规划了两个内存节点,一个磁盘节点。所有的节点之间通过镜像队列的方 式同步数据。内存节点用来给应用访问,磁盘节点用来持久化数据
- 为了实现对两个内存节点的负载,我们安装了两个 HAProxy,监听两个 5672和 15672 的端口
- 安装两个 Keepalived,一主一备。两个 Keepalived 抢占一个 VIP192.168.8.149。 谁抢占到这个 VIP,应用就连接到谁,来执行对 MQ 的负载。
这种情况下
Keepalived挂了一个节点,没有影响,因为 BACKUP 会变成 MASTER,抢占 VIP
HAProxy 挂了一个节点,没有影响,我们的 VIP 会自动路由可用的 HAProxy 服务
RabbitMQ 挂了一个节点,没有影响, 因为 HAProxy 会自动负载到可用的节点
最后,可以使用可视化工具监控RabbitMQ的整体情况,这里不再介绍
五、可靠性
1.分析
什么叫可靠性?
既然使用RabbitMQ进行消息的投递,那么就要考虑一些问题,比如消息发送丢了怎么办?重复消费了怎么办?无法消费怎么办?
还有许多可能产生的问题,这些问题大部分RabbitMQ都采用了一定的措施来避免,而这些措施,就是消息投递的可靠性机制
需要注意的是,一些可靠性措施会导致RabbitMQ收发效率下降—安全性提高必然导致效率的下降,理所当然;因此这些措施只在需要的场合再去使用,不是说一定要用
根据之前学过的RabbitMQ模型,很容易想到,想保证消息的可靠性,主要在四个方面下功夫
- 生产者发送消息到Broker
- 如何保证消息成功投递了呢?
- 消息从Exchange路由到Queue
- 如何能够正确的路由到Queue呢?路由不到怎么办?
- 消息在Queue中存储
- 如何保证存储可靠,不会随着宕机等问题丢失?
- 消费者订阅消费Queue的消息
- 如何保证消费和Queue中删除消息节奏保持一致?
下面就逐个分析这些过程中的可靠性机制
2.生产者发送消息到Broker
先看下这一步骤,一些可能导致丢失消息的情况
- 网络波动,消息发丢了
- Broker故障,硬盘故障/写满了
那么如何去设计,才能保证消息可靠投递呢?
很明显,设计一个反馈机制ack,当Broker收到并写好消息之后,返回一个ack告诉生产者消息成功投递了
在Rabbit当中,设置了两种确认机制来完成这个问题
- Transaction(事务)模式
- COnfirm(确认)模式
2.1 Transaction(事务)模式
事务模式怎么使用呢?它在创建 channel 的时候,可以把信道设置成事务模式, 然后就可以发布消息给 RabbitMQ 了
如果 channel.txCommit();的方法调用成功, 就说明事务提交成功,则消息一定到达了 RabbitMQ 中
1 | try { |
如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这个时候便可以将其捕获,进而通过执行 channel.txRollback()方法来实现事务回滚
在事务模式里面,只有收到了服务端的 Commit-OK 的指令,才能提交成功,所以可以解决生产者和服务端确认的问题
但是事务模式有一个缺点,它是阻塞的,一条消息没有发送完毕,不能发送下一条消息,它会榨干 RabbitMQ 服务器的性能,所以不建议在生产环境使用
那么有没有其他可以保证消息被 Broker 接收,但是又不大量消耗性能的方式呢?
这个就是第二种模式,叫做确认(Confirm)模式
2.2 Confirm(确认)模式
确认模式有三种
- 普通确认模式
- 批量确认模式
- 异步确认模式
首先是普通确认模式
在生产者这边通过调用 channel.confirmSelect()方法将信道设置为Confirm模式,然后发送消息
一旦消息被投递到交换机之后(跟是否路由到队列没有关系),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者,也就是调用 channel.waitForConfirms()返回 true,这样生产者就知道消息被服务端接收了。
如果网络错误,会抛出连接异常,如果交换机不存在,会抛出 404 错误
所以这种方法只是保证从生产者到Exchange可靠
具体代码实现如下
1 | // 1.开启发送方消息确认 |
但是这种方式效率过低,而且实际生产中大部分场景也不太可能一条一条的发送消息,因此就要用到第二种,批量确认模式
批量确认,就是在开启 Confirm 模式后,先发送一批消息
1 | try { |
只要 channel.waitForConfirmsOrDie();方法没有抛出异常,就代表消息都被服务端接收了
是不是很像并发编程中的countdownlatch?
但是这种方法也不是很好,比如批量,多少条消息算一批合适呢?
又比如发送1000条消息,前999正常ack,最后一条挂了,这时候就都要重发,不合适吧?
因此就出现第三种确认模式,可以一边发送一遍确认,这就是异步确认模式
异步确认模式需要添加一个 ConfirmListener,并且用一个 SortedSet 来维护一 个批次中没有被确认的消息
即添加一个监听和一个未确认集合
当生产者发送消息后,RabbitMQ 会在确认消息被接收后调用
ConfirmListener的handleAck方法,如果消息发送失败,则调用handleNack方法异步确认模式中,生产者需要维护一个
unconfirmedSet集合来存储未被确认的消息。当生产者发送消息后,将消息添加到unconfirmedSet集合中。当收到确认消息时,生产者需要从unconfirmedSet集合中移除确认消息的序列号。如果消息发送失败,生产者需要将
unconfirmedSet集合中的消息重新发送。如果消息一直发送失败,生产者需要考虑重试次数和重试间隔时间等问题,以避免陷入死循环
代码实现如下
1 | // 2.用来维护未确认消息的 deliveryTag;confirmSet对应上面提到的unconfirmedSet |
3.消息从交换机路由到队列
交换机到队列出现异常,可能是routing key错误,也有可能是队列根本就不存在
那么如何处理呢?主要是以下前两种,最后一种不可靠
- 拒绝消息并将其返回给生产者:当交换机无法将消息路由到队列时,将消息标记为“Return”,并将其发送回生产者。生产者可以通过在发送消息时设置
mandatory=true参数来启用这个模式。 - 将消息发送到备用交换机(Alternate Exchange):备用交换机是一种特殊的交换机,它会接收那些无法路由到其它交换机的消息,并将其重新发送到指定的队列中。
- 丢弃消息:当交换机无法将消息路由到队列时,可以直接将其丢弃。这种情况下,生产者无法得知消息是否已经被成功处理。
4.消息在队列中存储
这一种情况主要是消息存在内存中,如果重启、宕机等步骤后,消息会丢失
此外,元数据也需要持久化,防止丢失
所以主要就是以下步骤
- 队列持久化
- 交换机持久化
- 消息持久化
- 集群部署—提高高可用
5.消息投递到消费者
如果消费者收到消息后没来得及处理即发生异常,或者处理过程中发生异常,会导致消费失败
因此,服务端应该以某种方式得知消费者对消息的接收情况,并决定是否重新投递这条消息给其他消费者
RabbitMQ 提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送 ACK 给服务端
如果没有 ACK 会怎么办?永远等待下去?也不会
没有收到 ACK 的消息,消费者断开连接后,RabbitMQ 会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑(如果代码修复好了还好)
那么消费者怎么给Broker应答呢?
有两种方式,一种是自动 ACK,一种是手动 ACK
首先是自动 ACK,这个也是默认的情况。也就是我们没有在消费者处编写 ACK 的代码,消费者会在收到消息的时候就自动发送 ACK,而不是在方法执行完毕的时候发送 ACK(并不关心你有没有正常消费消息)
因此正常情况下,如果想保证这一步的可靠性,需要把自动ACK改为手动ACK,当业务处理完,消息正常消费后再手动发送ACK
6.消费者回调
考虑一个问题,生产者生产的某消息,最终被消费者消费了,那么它如何知道呢?
- 消费者收到消息,处理完毕后,调用生产者的 API(思考:是否破坏解耦?)
- 消费者收到消息,处理完毕后,发送一条响应消息给生产者
6.1 调用生产者 API
例如:提单系统给其他系统发送了保险消息后(通知通知!发生了一笔保险)
其他系统必须在处理完消息后调用提单系统提供的 API,来修改提单系统中这笔数 据的状态
只要 API 没有被调用,数据状态没有被修改,提单系统就认为下游系统没有收到这条消息
6.2 发送响应消息给生产者
例如:商业银行与人民银行二代支付通信(使用 IBM MQ),无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消 息(叫做回执报文)
整个通信的流程设计得非常复杂,但是对于金融场景下的消息可靠性保证,是很有用的
7.补偿机制
如果生产者的 API 就是没有被调用,也没有收到消费者的响应消息,怎么办?
由于可能存在网络拥堵等情况,所以可以先等一段时间
生产者与消费者之间应该约定一个超时时间,对于超出这个时间没有得到响应的消息,才确定为消费失败,比如 5 分钟
5 分钟,对于临时性故障的处理,比如网络恢复,或者重启应用,重启数据库,应该够了
过了 5 分钟依然没有得到回复的消息,才判断为消费失败
确定消费失败以后怎么办呢?肯定要重发消息了
此时就要考虑,谁来重发消息,多久重发一次等问题了
7.1 谁来重发
首先,肯定是代码重发
先创建一个定时任务,比如每 30 秒跑一次,找到业务表里面的这条业务状态是中 咕泡出品 必属精品 间状态的记录,查询出来,构建为 MQ 消息,重新发送
也可以单独设计设计一张消息表,把本系统所以发送出去的消息全部异步地登记起来,找出状态是未回复的消息发送(注意:这种做法毫无疑问会消耗性能、消耗数据库存储空间)
7.2 多久重发一次
假如消费者一直没有回复,比如它重启要 20 分钟,你 5 分钟之内尝试重发,肯定还不能正常消费
所以重发肯定不只发一次,要尝试多次,但是又不能发得太频繁, 给它一点恢复的时间,比如可以设置为 1 分钟重发一次
也可以设置衰减机制,第一次隔一分钟,第二次隔两分钟(谈恋爱的时候,发消息不回复,开始一天联系一次, 后来一周联系一次,慢慢地失去了信心)
时间由定时任务的执行时间决定
7.3 重发几次
一般来说,不会重发太多次,2-3次即可
7.4 重发什么
一般来说,重发一样的内容
8.消费幂等性
所谓幂等性,就是无论第几次消费消息,造成的结果和第一次消费一样
为什么要幂等性?
假如转账消费,消费一次+1000元,那要是重复消费就是大问题了
那么如何解决幂等性问题呢?
可以使用唯一的业务ID,通过日志或者消息落库来做重复控制
9.最终一致
即在投递消费之后,再次核对
10.消费顺序性
即保证消费顺序和投递顺序一致
如何保证?
一个队列一个消费者






