1. 消息中间件概述
1.1 什么是消息中间件
MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。
为什么使用MQ
在项目中,可将一些无需即时返回且耗时的操作提取出来,进行异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
开发中消息队列通常有如下应用场景:
1、任务异步处理
将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
2、应用程序解耦合
MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。
3、削峰填谷
如订单系统,在下单的时候就会往数据库写数据。但是数据库只能支撑每秒1000左右的并发写入,并发量再高就容易宕机。低峰期的时候并发也就100多个,但是在高峰期时候,并发量会突然激增到5000以上,这个时候数据库肯定卡死了。
消息被MQ保存起来了,然后系统就可以按照自己的消费能力来消费,比如每秒1000个数据,这样慢慢写入数据库,这样就不会卡死数据库了。
但是使用了MQ之后,限制消费消息的速度为1000,但是这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000QPS,直到消费完积压的消息,这就叫做“填谷”
1.2 AMQP 和 JMS
MQ是消息通信的模型;实现MQ的大致有两种主流方式:AMQP、JMS。
1.2.1 AMQP
AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。
1.2.2 JMS
JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
1.2.3 AMQP 与 JMS 区别
- JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
- JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
- JMS规定了两种消息模式;而AMQP的消息模式更加丰富
1.3 消息队列产品
市场上常见的消息队列有如下:
- ActiveMQ:基于JMS
- ZeroMQ:基于C语言开发
- RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
- RocketMQ:基于JMS,阿里巴巴产品
- Kafka:类似MQ的产品;分布式消息系统,高吞吐量
1.4 RabbitMQ
RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6种模式:简单模式,work模式,Publish/Subscribe发布与订阅模式,Routing路由模式,Topics主题模式,RPC远程调用模式(远程调用,不太算MQ;暂不作介绍);
官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
2. 安装及配置RabbitMQ
2.1 安装Socat
在线安装依赖环境:
1 | yum install gcc |
2.2 安装Erlang
1 | mkdir /rabbitmq && cd /rabbitmq |
2.3 安装RabbitMQ
1 | cd /rabbitmq |
2.4 开启管理界面及配置
1 | # 开启管理界面 |
修改/etc/rabbitmq/rabbitmq.config
配置文件:
2.5 启动
1 | centos6用这个命令: |
2.6 配置虚拟主机及用户
2.6.1 用户角色
RabbitMQ在安装好后,可以访问http://ip地址:15672
;其自带了guest/guest的用户名和密码;如果需要创建自定义用户;那么也可以登录管理界面后,如下操作:
角色说明:
超级管理员(administrator)
可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作。
监控者(monitoring)
可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
策略制定者(policymaker)
可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。
普通管理者(management)
仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。
其他
无法登陆管理控制台,通常就是普通的生产者和消费者。
2.6.2 Virtual Hosts配置
像mysql拥有数据库的概念并且可以指定用户对库和表等操作的权限。RabbitMQ也有类似的权限管理;在RabbitMQ中可以虚拟消息服务器Virtual Host,每个Virtual Hosts相当于一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。 相当于mysql的db。Virtual Name一般以/开头。
2.6.2.1 创建Virtual Hosts
2.6.2.2 设置Virtual Hosts权限
3. RabbitMQ入门
3.1 创建工程添加依赖
1 |
|
3.2 编写生产者
编写消息生产者com.wgy.rabbitmq.simple.Producer
1 | /** |
在执行上述的消息发送之后;可以登录rabbitMQ的管理控制台,可以发现队列和其消息:
3.3 编写消费者
抽取创建connection的工具类com.wgy.rabbitmq.util.ConnectionUtil;
1 | /** |
编写消息的消费者com.wgy.rabbitmq.simple.Consumer
1 | /** |
3.4 小结
上述的入门案例中其实使用的是如下的简单模式:
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
在rabbitMQ中消费者是一定要到某个消息队列中去获取消息的
4. AMQP
4.1. 相关概念介绍
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
AMQP是一个二进制协议,拥有一些现代化特点:多信道、协商式,异步,安全,扩平台,中立,高效。
RabbitMQ是AMQP协议的Erlang的实现。
概念 | 说明 |
---|---|
连接Connection | 一个网络连接,比如TCP/IP套接字连接。 |
会话Session | 端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。 |
信道Channel | 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。 |
客户端Client | AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。 |
服务节点Broker | 消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。 |
端点 | AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。 |
消费者Consumer | 一个从消息队列里请求消息的客户端程序。 |
生产者Producer | 一个向交换机发布消息的客户端应用程序。 |
4.2 RabbitMQ运转流程
在入门案例中:
- 生产者发送消息
- 生产者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker;
- 声明队列并设置属性;如是否排它,是否持久化,是否自动删除;
- 将路由键(空字符串)与队列绑定起来;
- 发送消息至RabbitMQ Broker;
- 关闭信道;
- 关闭连接;
- 消费者接收消息
- 消费者创建连接(Connection),开启一个信道(Channel),连接到RabbitMQ Broker
- 向Broker 请求消费相应队列中的消息,设置相应的回调函数;
- 等待Broker回应闭关投递响应队列中的消息,消费者接收消息;
- 确认(ack,自动确认)接收到的消息;
- RabbitMQ从队列中删除相应已经被确认的消息;
- 关闭信道;
- 关闭连接;
4.3 生产者流转过程说明
- 客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQP 0-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
- 客户端调用connection.createChannel方法。此方法开启信道,其包装的channel.open命令发送给Broker,等待channel.basicPublish方法,对应的AMQP命令为Basic.Publish,这个命令包含了content Header 和content Body()。content Header 包含了消息体的属性,例如:投递模式,优先级等,content Body 包含了消息体本身。
- 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。
4.4 消费者流转过程说明
- 消费者客户端与代理服务器Broker建立连接。会调用newConnection() 方法,这个方法会进一步封装Protocol Header 0-9-1 的报文头发送给Broker ,以此通知Broker 本次交互采用的是AMQP 0-9-1 协议,紧接着Broker 返回Connection.Start 来建立连接,在连接的过程中涉及Connection.Start/.Start-OK 、Connection.Tune/.Tune-Ok ,Connection.Open/ .Open-Ok 这6 个命令的交互。
- 消费者客户端调用connection.createChannel方法。和生产者客户端一样,协议涉及Channel.Open/Open-Ok命令。
- 在真正消费之前,消费者客户端需要向Broker 发送Basic.Consume 命令(即调用channel.basicConsume 方法〉将Channel 置为接收模式,之后Broker 回执Basic.Consume - Ok 以告诉消费者客户端准备好消费消息。
- Broker 向消费者客户端推送(Push) 消息,即Basic.Deliver 命令,这个命令和Basic.Publish 命令一样会携带Content Header 和Content Body。
- 消费者接收到消息并正确消费之后,向Broker 发送确认,即Basic.Ack 命令。
- 客户端发送完消息需要关闭资源时,涉及到Channel.Close和Channl.Close-Ok 与Connetion.Close和Connection.Close-Ok的命令交互。
5. RabbitMQ工作模式
5.1 Work queues工作队列模式
5.1.1 模式说明
Work Queues
与入门程序的简单模式
相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
5.1.2 代码
Work Queues
与入门程序的简单模式
的代码是几乎一样的;可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。
5.1.2.1 生产者
1 | /** |
5.1.2.2 消费者1
1 | /** |
5.1.2.3 消费者2
1 | /** |
5.1.3 测试
启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。
5.1.4 小结
在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
5.2 订阅模式类型
订阅模式示例图:
前面2个案例中,只有3个角色:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分
而在订阅模型中,多了一个exchange角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接受者,会一直等待消息到来。
- Queue:消息队列,接收消息、缓存消息。
- Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
- Fanout:广播,将消息交给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key 的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
5.3 Publish/Subscribe发布与订阅模式
5.3.1 模式说明
发布订阅模式: 1、每个消费者监听自己的队列。 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
5.3.2 代码
5.3.2.1 生产者
1 | /** |
5.3.2.2 消费者1
1 | /** |
5.3.2.3 消费者2
1 | /** |
5.3.3. 测试
启动所有消费者,然后使用生产者发送消息;在每个消费者对应的控制台可以查看到生产者发送的所有消息;到达广播的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges
选项卡,点击 fanout_exchange
的交换机,可以查看到如的绑定:
5.3.4 小结
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
- 工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
- 发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使用默认交换机)。
- 发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将队列绑 定到默认的交换机 。
5.4 Routing路由模式
5.4.1 模式说明
路由模式特点:
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
图解:
- P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
- C1:消费者,其所在队列指定了需要routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
5.4.2 代码
在编码上与 Publish/Subscribe发布与订阅模式
的区别是交换机的类型为:Direct,还有队列绑定交换机的时候需要指定routing key。
5.4.2.1 生产者
1 | /** |
5.4.2.2 消费者1
1 | /** |
5.4.2.3 消费者2
1 | /** |
5.4.3 测试
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges
选项卡,点击 direct_exchange
的交换机,可以查看到如下的绑定:
5.4.4. 小结
Routing模式要求队列在绑定交换机时要指定routing key,消息会转发到符合routing key的队列。
5.5 Topics通配符模式
5.5.1 模式说明
Topic
类型与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定Routing key
的时候使用通配符!
Routingkey
一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.insert.abc
或者 item.insert
item.*
:只能匹配item.insert
5.5.2 代码
5.5.2.1 生产者
使用topic类型的Exchange,发送消息的routing key有3种: item.insert
、item.update
、item.delete
:
1 | /** |
5.2.2.2 消费者1
接收两种类型的消息:更新商品和删除商品
1 | /** |
5.2.2.3 消费者2
接收所有类型的消息:新增商品,更新商品和删除商品。
1 | /** |
5.5.3 测试
启动所有消费者,然后使用生产者发送消息;在消费者对应的控制台可以查看到生产者发送对应routing key对应队列的消息;到达按照需要接收的效果;并且这些routing key可以使用通配符。
在执行完测试代码后,其实到RabbitMQ的管理后台找到Exchanges
选项卡,点击 topic_exchange
的交换机,可以查看到如下的绑定:
5.5.4. 小结
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式
和 Routing路由模式
的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。
5.6 模式总结
RabbitMQ工作模式:
1、简单模式 HelloWorld 一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
2、工作队列模式 Work Queue 一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
3、发布订阅模式 Publish/subscribe 需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
4、路由模式 Routing 需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
5、通配符模式 Topic 需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
6. Spring Boot整合RabbitMQ
6.1 简介
在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ https://github.com/spring-projects/spring-amqp
尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。
一般在开发过程中:
生产者工程:
- application.yml文件配置RabbitMQ相关信息;
- 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
- 注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
消费者工程:
- application.yml文件配置RabbitMQ相关信息
- 创建消息处理类,用于接收队列中的消息并进行处理
6.2 搭建生产者工程
6.2.1 创建工程添加依赖
1 |
|
6.2.2 配置RabbitMQ
6.2.2.1 配置文件
创建application.yml,内容如下:
1 | #端口 |
6.2.2.2 绑定交换机和队列
创建RabbitMQ队列与交换机绑定的配置类com.wgy.rabbitmq.config.RabbitMQConfig
1 | /** |
6.2.3 启动类
1 | /** |
6.2.4 消息发送Controller
我们创建一个SpringMVC的Controller方便我们进行测试
1 | /** |
6.3 搭建消费者工程
6.3.1 创建工程添加依赖
1 |
|
6.3.2 配置RabbitMQ
创建application.yml,内容如下:
1 | #端口 |
6.3.3 启动类
1 | /** |
6.3.4 消息监听处理类
编写消息监听器com.wgy.rabbitmq.listener.MyListener
1 | /** |
6.3.5 测试
在生产者工程springboot-rabbitmq-producer中创建测试类,发送消息:
1 | /** |
先运行上述测试程序(交换机和队列才能先被声明和绑定),然后启动消费者;在消费者工程springboot-rabbitmq-consumer中控制台查看是否接收到对应消息。
7. RabbitMQ 高级
7.1 过期时间TTL
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。
- 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
- 第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。
7.1.1 设置队列TTL
在 springboot-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
文件中添加如下内容:
1 |
|
启动类导入配置文件
1 | /** |
然后在测试类 springboot-rabbitmq-producer\src\test\java\com\wgy\rabbitmq\ProducerTest.java
中编写如下方法发送消息到上述定义的队列:
1 | /** |
参数 x-message-ttl 的值 必须是非负 32 位整数 (0 <= n <= 2^32-1) ,以毫秒为单位表示 TTL 的值。这样,值 6000 表示存在于 队列 中的当前 消息 将最多只存活 6 秒钟。
如果不设置TTL,则表示此消息不会过期。如果将TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃。
7.1.2 设置消息TTL
消息的过期时间;只需要在发送消息(可以发送到任何队列,不管该队列是否属于某个交换机)的时候设置过期时间即可。在测试类中编写如下方法发送消息并设置过期时间到队列:
1 | /** |
expiration 字段以微秒为单位表示 TTL 值。且与 x-message-ttl 具有相同的约束条件。因为 expiration 字段必须为字符串类型,broker 将只会接受以字符串形式表达的数字。
当同时指定了 queue 和 message 的 TTL 值,则两者中较小的那个才会起作用。
7.2 死信队列
DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。
消息变成死信,可能是由于以下的原因:
- 消息被拒绝
- 消息过期
- 队列达到最大长度
DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。
要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange
指定交换机即可。
具体步骤如下面的章节。
7.2.1 定义死信交换机
在 springboot-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
文件中添加如下内容:
1 | <!--定义定向交换机中的持久化死信队列,不存在则自动创建--> |
7.2.2 队列设置死信交换机
为了测试消息在过期、队列达到最大长度后都将被投递死信交换机上;所以添加配置如下:
在 springboot-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
文件中添加如下内容:
1 | <!--定义过期队列及其属性,不存在则自动创建--> |
7.2.3 消息过期的死信队列测试
7.2.3.1 发送消息代码
添加 springboot-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java
方法
1 | /** |
7.2.3.2 RabbitMQ管理界面
未过期:
过期后:
7.2.3.3 流程
具体因为队列消息过期而被投递到死信队列的流程:
7.2.4 消息过长的死信队列测试
7.2.4.1 发送消息代码
添加 springboot-rabbitmq-producer\src\test\java\com\itheima\rabbitmq\ProducerTest.java
方法
1 | /** |
7.2.4.2 RabbitMQ管理界面
上面发送的3条消息中的第1条消息会被投递到死信队列中(如果启动了消费者,那么队列消息很快会被取走消费掉);
7.2.4.3 消费者接收死信队列消息
与过期消息投递到死信队列的代码和配置是共用的,并不需要重新编写。
7.2.4.4 流程
消息超过队列最大消息长度而被投递到死信队列的流程在前面的图中已包含。
7.3 延迟队列
延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
在RabbitMQ中延迟队列可以通过 过期时间
+ 死信队列
来实现;具体如下流程图所示:
在上图中;分别设置了两个5秒、10秒的过期队列,然后等到时间到了则会自动将这些消息转移投递到对应的死信队列中,然后消费者再从这些死信队列接收消息就可以实现消息的延迟接收。
延迟队列的应用场景;如:
- 在电商项目中的支付场景;如果在用户下单之后的几十分钟内没有支付成功;那么这个支付的订单算是支付失败,要进行支付失败的异常处理(将库存加回去),这时候可以通过使用延迟队列来处理
- 在系统中如有需要在指定的某个时间之后执行的任务都可以通过延迟队列处理
7.4 消息确认机制
确认并且保证消息被送达,提供了两种方式:发布确认和事务。(两者不可同时使用)在channel为事务时,不可引入确认模式;同样channel为确认模式下,不可使用事务。
7.4.1 发布确认
有两种方式:消息发送成功确认和消息发送失败回调。
7.4.1.1 消息发送成功确认
在springboot-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
connectionFactory 中启用消息确认:
1 | <!-- publisher-confirms="true" 表示:启用了消息确认 --> |
配置消息确认回调方法如下:
1 | <!-- 消息回调处理类 --> |
消息确认回调方法com.wgy.rabbitmq.MsgSendConfirmCallBack如下:
1 | /** |
功能测试如下:
发送消息
1 | /** |
管理界面确认消息发送成功
消息确认回调
7.4.1.2 消息发送失败回调
在springboot-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
connectionFactory 中启用回调:
1 | <!-- publisher-returns="true" 表示:启用了失败回调 --> |
配置消息失败回调方法如下:
注意:同时需配置mandatory=”true”,否则消息则丢失
1 | <!-- 消息失败回调处理类 --> |
消息失败回调方法com.wgy.rabbitmq.MsgSendReturnCallback如下:
1 | /** |
功能测试如下:
模拟消息发送失败
1 |
|
失败回调结果如下:
7.4.2 事务支持
场景:业务处理伴随消息的发送,业务处理失败(事务回滚)后要求消息不发送。rabbitmq 使用调用者的外部事务,通常是首选,因为它是非侵入性的(低耦合)。
外部事务的配置:springboot-rabbitmq-producer\src\main\resources\spring\spring-rabbitmq.xml
1 | <!-- channel-transacted="true" 表示:支持事务操作 --> |
模拟业务处理失败的场景:
测试类或者测试方法上加入@Transactional注解
1
2
3
4
5
6
7
8
9
10
11
//开启事务
//@Rollback(false)//在测试的时候,需要手动的方式制定回滚的策略
public void queueTest2() {
//路由键与队列同名
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--01。");
System.out.println("----------------dosoming:可以是数据库的操作,也可以是其他业务类型的操作---------------");
//模拟业务处理失败
System.out.println(1 / 0);
rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--02。");
}测试结果:
7.5 消息追踪
消息中心的消息追踪需要使用Trace实现,Trace是Rabbitmq用于记录每一次发送的消息,方便使用Rabbitmq的开发者调试、排错。可通过插件形式提供可视化界面。Trace启动后会自动创建系统Exchange:amq.rabbitmq.trace ,每个队列会自动绑定该Exchange,绑定后发送到队列的消息都会记录到Trace日志。
7.5.1 消息追踪启用与查看
以下是trace的相关命令和使用(要使用需要先rabbitmq启用插件,再打开开关才能使用):
命令集 | 描述 |
---|---|
rabbitmq-plugins list | 查看插件列表 |
rabbitmq-plugins enable rabbitmq_tracing | rabbitmq启用trace插件 |
rabbitmqctl trace_on | 打开trace的开关 |
rabbitmqctl trace_on -p itcast | 打开trace的开关(itcast为需要日志追踪的vhost) |
rabbitmqctl trace_off | 关闭trace的开关 |
rabbitmq-plugins disable rabbitmq_tracing | rabbitmq关闭Trace插件 |
rabbitmqctl set_user_tags heima administrator | 只有administrator的角色才能查看日志界面 |
安装插件并开启 trace_on 之后,会发现多个 exchange:amq.rabbitmq.trace ,类型为:topic。
7.5.2 日志追踪
第一步:发送消息
1 | rabbitTemplate.convertAndSend("spring_queue", "只发队列spring_queue的消息--01。"); |
发送成功,web查看多了一条消息
第二步:查看trace
第三步:点击Tracing查看Trace log files
第四步:点击itcast-trace.log确认消息轨迹正确性
url:http://127.0.0.1:15672/api/trace-files/itcast-trace.log
8. RabbitMQ集群架构模式
8.1 主备模式
用来实现RabbitMQ的高可用集群,一般是在并发和数据不是特别多的时候使用,当主节点挂掉以后会从备份节点中选择一个节点出来作为主节点对外提供服务。
8.2 远程模式
主要用来实现双活,简称为Shovel模式,所谓的Shovel模式就是让我们可以把消息复制到不同的数据中心,让两个跨地域的集群互联。
8.3 镜像队列模式
镜像队列也被称为Mirror队列,主要是用来保证mq消息的可靠性的,他通过消息复制的方式能够保证我们的消息100%不丢失,同时该集群模式也是企业中使用最多的模式。
8.4 多活模式
多活模式主要是用来实现异地数据复制,Shovel模式其实也可以实现,但是他的配置及其繁琐同时还要受到版本的限制,所以如果做异地多活我们更加推荐使用多活模式,使用多活模式我们需要借助federation插件来实现集群与集群之间或者节点与节点之前的消息复制,该模式被广泛应用于饿了么、美团、滴滴等企业。
8.5 集群模式总结
主备模式下主节点提供读写,从节点不提供读写服务,只是负责提供备份服务,备份节点的主要功能是在主节点宕机时,完成自动切换 从–>主,同时因为主备模式下始终只有一个对外提供服务那么对于高并发的情况下该模式并不合适.
远程模式可以让我们实现异地多活的mq,但是现在已经有了更好的异地多活解决方案,所以在实际的项目中已经不推荐使用了
镜像队列模式可以让我们的消息100%不丢失,同时可以结合HAProxy来实现高并发的业务场景所以在项目中使用得最多
9. RabbitMQ 应用
9.1 消息堆积
当消息生产的速度长时间,远远大于消费的速度时。就会造成消息堆积。
- 消息堆积的影响
- 可能导致新消息无法进入队列
- 可能导致旧消息无法丢失
- 消息等待消费的时间过长,超出了业务容忍范围。
- 产生堆积的情况
- 生产者突然大量发布消息
- 消费者消费失败
- 消费者出现性能瓶颈。
- 消费者挂掉
- 解决办法
- 排查消费者的消费性能瓶颈
- 增加消费者的多线程处理
- 部署增加多个消费者
9.2 消息丢失
在实际的生产环境中有可能出现一条消息因为一些原因丢失,导致消息没有消费成功,从而造成数据不一致等问题,造成严重的影响,比如:在一个商城的下单业务中,需要生成订单信息和扣减库存两个动作,如果使用RabbitMQ来实现该业务,那么在订单服务下单成功后需要发送一条消息到库存服务进行扣减库存,如果在此过程中,一条消息因为某些原因丢失,那么就会出现下单成功但是库存没有扣减,从而导致超卖的情况,也就是库存已经没有了,但是用户还能下单,这个问题对于商城系统来说是致命的。
消息丢失的场景主要分为:消息在生产者丢失,消息在RabbitMQ丢失,消息在消费者丢失。
9.2.1 消息在生产者丢失
9.2.1.1 场景介绍
消息生产者发送消息成功,但是MQ没有收到该消息,消息在从生产者传输到MQ的过程中丢失,一般是由于网络不稳定的原因。
9.2.1.2 解决方案
采用RabbitMQ 发送方消息确认机制,当消息成功被MQ接收到时,会给生产者发送一个确认消息,表示接收成功。RabbitMQ 发送方消息确认模式有以下三种:普通确认模式,批量确认模式,异步监听确认模式。spring整合RabbitMQ后只使用了异步监听确认模式。
说明
异步监听模式,可以实现边发送消息边进行确认,不影响主线程任务执行。
步骤
1、生产者发送3000条消息
2、在发送消息前开启开启发送方确认模式
1 | <!-- publisher-confirms="true" 表示:启用了消息确认 --> |
3、在发送消息前添加异步确认监听器
1 | /** |
9.2.2 消息在RabbitMQ丢失
9.2.2.1 场景介绍
消息成功发送到MQ,消息还没被消费却在MQ中丢失,比如MQ服务器宕机或者重启会出现这种情况
9.2.2.2 解决方案
持久化交换机,队列,消息,确保MQ服务器重启时依然能从磁盘恢复对应的交换机,队列和消息。
spring整合后默认开启了交换机,队列,消息的持久化,所以不修改任何设置就可以保证消息不在RabbitMQ丢失。但是为了以防万一,还是可以申明下。
9.2.3 消息在消费者丢失
9.2.3.1 场景介绍
消息费者消费消息时,如果设置为自动回复MQ,消息者端收到消息后会自动回复MQ服务器,MQ则会删除该条消息,如果消息已经在MQ被删除但是消费者的业务处理出现异常或者消费者服务宕机,那么就会导致该消息没有处理成功从而导致该条消息丢失。
9.2.3.2 解决方案
设置为手动回复MQ服务器,当消费者出现异常或者服务宕机时,MQ服务器不会删除该消息,而是会把消息重发给绑定该队列的消费者,如果该队列只绑定了一个消费者,那么该消息会一直保存在MQ服务器,直到消息者能正常消费为止。本解决方案以一个队列绑定多个消费者为例来说明,一般在生产环境上也会让一个队列绑定多个消费者也就是工作队列模式来减轻压力,提高消息处理效率
MQ重发消息场景:
1.消费者未响应ACK,主动关闭频道或者连接
2.消费者未响应ACK,消费者服务挂掉
9.3 有序消费消息
9.3.1 场景介绍
9.3.1.1 场景1
当RabbitMQ采用work Queue模式,此时只会有一个Queue但是会有多个Consumer,同时多个Consumer直接是竞争关系,此时就会出现MQ消息乱序的问题。
9.3.1.2 场景2
当RabbitMQ采用简单队列模式的时候,如果消费者采用多线程的方式来加速消息的处理,此时也会出现消息乱序的问题。
9.3.1.3 场景1解决
9.3.1.4 场景2解决
9.4 重复消费
9.4.1 场景介绍
为了防止消息在消费者端丢失,会采用手动回复MQ的方式来解决,同时也引出了一个问题,消费者处理消息成功,手动回复MQ时由于网络不稳定,连接断开,导致MQ没有收到消费者回复的消息,那么该条消息还会保存在MQ的消息队列,由于MQ的消息重发机制,会重新把该条消息发给和该队列绑定的消息者处理,这样就会导致消息重复消费。而有些操作是不允许重复消费的,比如下单,减库存,扣款等操作。
MQ重发消息场景:
1.消费者未响应ACK,主动关闭频道或者连接
2.消费者未响应ACK,消费者服务挂掉
9.4.2 解决方案
如果消费消息的业务是幂等性操作(同一个操作执行多次,结果不变)就算重复消费也没问题,可以不做处理,如果不支持幂等性操作,如:下单,减库存,扣款等,那么可以在消费者端每次消费成功后将该条消息id保存到数据库,每次消费前查询该消息id,如果该条消息id已经存在那么表示已经消费过就不再消费否则就消费。本方案采用redis存储消息id,因为redis是单线程的,并且性能也非常好,提供了很多原子性的命令,本方案使用setnx命令存储消息id。
setnx(key,value):如果key不存在则插入成功且返回1,如果key存在,则不进行任何操作,返回0
-------------本文结束感谢您的阅读-------------
本文标题: RabbitMQ
本文链接: https://wgy1993.gitee.io/archives/b543ced0.html
版权声明: 本作品采用 CC BY-NC-SA 4.0 进行许可。转载请注明出处!
