RabbitMQ
简介
RabbitMQ特点
RabbitMQ 是一个由Erlang语言开发的基于AMQP标准的开源实现。RabbitMQ最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
- 保证可靠性(Reliability)。RabbitMQ使用一些机制来保证可靠性,如持久化、传输确认、发布确认等。
- 具有灵活的路由(Flexible Routing)功能。在消息进入队列之前,是通过Exchange(交换器)来路由消息的,对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也可以通过插件机制来实现自己的Exchange。
- 支持消息集群(Clustering)。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
- 具有高可用性(Highly Available)。队列可以在集群中的机器上进行镜像,使得在部分节点出现问题的情况下队列仍然可用。
- 支持多种协议(Multi-protocol)。RabbitMQ除支持AMQP协议之外,还通过插件的方式支持其他消息队列协议,比如STOMP、MQTT等。
- 支持多语言客户端(Many Client)。RabbitMQ几乎支持所有常用的语言,比如Java、.NET、Rudy等。
- 提供管理界面(ManagerMent UI)。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
- 提供跟踪机制(Tracing)。RabbitMQ提供了消息跟踪机制,如果消息异常,使用者可以查出发生了什么情况。
- 提供插件机制(Plugin System)。RabbitMQ提供了许多插件,从多方面进行扩展,也可以编写自己的插件。
RabbitMQ基本概念
RabbitMQ是AMQP协议的一个开源实现。
- Message(消息):消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先级)、delivery-mode(指出该消息可能需要持久化存储)等。
- Publisher(消息生产者):一个向交换器发布消息的客户端应用程序。
- Exchange(交换器):用来接收生产者发送的消息,并将这些消息路由给服务器中的队列。
- Binding(绑定):用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Queue(消息队列):用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一条消息可被投入一个或多个队列中。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Connection(网络连接):比如一个TCP连接。
- Channel(信道):多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP命令都是通过信道发送出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成的。因为对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概念,以复用一个TCP连接。
- Consumer(消息消费者):表示一个从消息队列中取得消息的客户端应用程序。
- Virtual Host(虚拟主机,在RabbitMQ中叫vhost):表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。本质上每个vhost就是一台缩小版的RabbitMQ服务器,它拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ默认的vhost是 “/”。
- Broker:表示消息队列服务器实体。
AMQP中的消息路由
生产者需要把消息发布到Exchange上,消息最终到达队列并被消费者接收,而Binding决定交换器上的消息应该被发送到哪个队列中。
交换器类型
不同的交换器分发消息的策略也不同,目前交换器有四种类型:Direct、Fanout、Topic、Headers。其中Headers交换器匹配AMQP消息的Header而不是路由键。此外,Headers交换器和Direct交换器完全一致,但性能相差很多,目前几乎不用了,所以下面我们看另外三种类型。
Direct交换器
如果消息中的路由键(routing key)和Binding中的绑定键(Binding key)一致,交换器就将消息发送到对应的队列中。路由键与队列名称要完全匹配,如果将一个队列绑定到交换机要求路由键为”dog”,则只转发routing key标记为”dog”的消息,不会转发”dog.puppy”消息,也不会转发”dog.guard”消息等。Direct交换器是完全匹配、单播的模式。
Fanout交换器
Fanout交换器不处理路由键,只是简单地将队列绑定到交换器,发送到交换器,发送到交换器的每条消息都会被转发到与该交换器绑定的所有队列中。这很像子网广播,子网内的每个主机都获得了一份复制额消息。通过Fanout交换器转发消息是最快的。
Topic交换器
Topic交换器通过模式匹配分配消息的路由键属性,将路由键和某种模式进行匹配,此时队列需要绑定一种模式。Topic交换器将路由键和绑定键的字符串切分成单词,这些单词之间用点 .
隔开。该交换器会识别两个通配符: #
和 *
,其中 #
匹配0个单词或多个单词, *
匹配刚好一个单词。
工程实例
Java访问RabbitMQ实例
RabbitMQ支持多种语言访问,下面看一下Java中使用RabbitMQ的例子。
- 在Maven工程中添加依赖
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hu</groupId>
<artifactId>RabbitMQ</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
</dependencies>
</project>
- 消息生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("hu");
factory.setPassword("123");
//设置RabbitMQ地址
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
//建立到代理服务器的连接
Connection connection = factory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct",true);
String routingKey = "testRoutingKey";
//channel.exchangeDeclare(exchangeName, "topic",true);
//String routingKey = "hu.hu";
//发布消息
byte[] message = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, message);
//关闭信道和连接
channel.close();
connection.close();
}
}
首先创建一个工厂,再根据连接工厂创建连接,之后从连接中创建信道,接着声明一个交换器和指定路由键,然后才发布消息,最后将所创建的信道、连接等资源关闭。代码中的ConnectionFactory、Connection、Channel都是RabbitMQ提供的API中最基本的类。ConnectionFactory是Connection的制造工厂,ConnectionFactory代表RabbitMQ的Socket连接,它封装了Socket操作的相关逻辑。Channel是与RabbitMQ打交道的最重要的接口,大部分业务操作都是在Channel中完成的,比如定义队列、定义交换器、队列和交换器的绑定、发布消息等。
- 消息消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("hu");
factory.setPassword("123");
factory.setHost("127.0.0.1");
factory.setVirtualHost("/");
//建立到代理服务器的连接
Connection connection = factory.newConnection();
//创建信道
final Channel channel = connection.createChannel();
//声明交换器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct",true);
//channel.exchangeDeclare(exchangeName, "topic",true);
//声明队列
String queue = channel.queueDeclare().getQueue();
//绑定队列,通过路由键testRoutingKey将队列和交换器绑定起来
channel.queueBind(queue, "hello-exchange", "testRoutingKey");
//channel.queueBind(queue, "hello-exchange", "hu.*");
while(true) {
//消费消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queue, autoAck,consumerTag,new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:" + routingKey);
System.out.println("消费的内容类型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body,"utf-8");
System.out.println(bodyStr);
}
});
}
}
}
消息消费者通过不断循环等待服务推送消息,一旦有消息过来,就在控制台输出消息的相关内容。一开始的创建连接、创建信道、声明交换器的代码和发布消息时是一样的,但在 消费消息时需要指定队列名称,所以这里多绑了队列这一步,接下来是循环等待消息过来并打印消息内容。
- 启动RabbitMQ服务器
使用命令启动RabbitMQ服务器 ./sbin/rabbitmq-server
- 运行Consumer
先运行Consumer的main方法,这样当生产者发送消息时就能在消费者后端看到消息记录。
- 运行Producer
接下来运行Producer的main方法,发布一条消息,在Consumer的控制台就能看到接收到的消息。
Spring整合RabbitMQ
在之前的消息消费者和消息生产者中有很多重复的代码,并且里面很多都是配置信息。站在使用者的角度来看,其实只关心发送消息和消费消息两件事。基于此,Spring框架也集成了RabbitMQ,用于简化使用RabbitMQ时的操作。
- 添加依赖
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
spring-rabbit就是Spring整合了RabbitMQ的包。
- Spring配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<bean id="myMessageListener" class="com.hu.listener.MyMessageListener" />
<!-- 配置连接 -->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672"
username="hu" password="123" virtual-host="testhost" requested-heartbeat="60" />
<!-- 配置 RabbitTemplate -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="myExchange" routing-key="hu.bar" />
<!-- 配置RabbitAdmin -->
<rabbit:admin connection-factory="connectionFactory" />
<!-- 配置队列名称 -->
<rabbit:queue name="myQueue" />
<!-- 配置 Topic类型的交换器 -->
<rabbit:topic-exchange name="myExchange">
<rabbit:bindings>
<rabbit:binding queue="myQueue" pattern="hu.*" />
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 配置监听器 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="myMessageListener" queue-names="myQueue" />
</rabbit:listener-container>
</beans>
spring-rabbit的主要API如下:
-
MessageListenerContainer:用来监听容器,为消息入队提供异步处理。
-
RabbitTemplate:用来发送和接收消息。
-
RabbitAdmin:用来声明队列、交换器、绑定。
所以,与RabbitMQ相关的配置也会包括配置连接、配置RabbitTemplate、配置RabbitAdmin、配置队列名称、配置交换器、配置监听器等。
- 发送消息
public class SendMessage {
public static void main(String[] args) {
AbstractXmlApplicationContext context = new ClassPathXmlApplicationContext("spring-context.xml");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("hello world!");
context.close();
}
}
在发送消息时先从配置文件中获取到RabbitTemplate对象,接着就调用convertAndSend发送消息。
- 消费消息
public class MyMessageListener implements MessageListener {
public void onMessage(Message message) {
String messageBody = new String(message.getBody());
System.out.println("接收到消息:" + messageBody);
}
}
通过实现MessageListener接口来监听消息的方式消费消息。注意:在配置文件中将声明MyMessageListener类的一个bean,然后在 rabbit:listener
的配置中引用该bean。
- 运行SendMessage
运行SendMessage类的main方法,在控制台将看到打印出接收到的消息。
RabbitMQ实践建议
虚拟主机
虚拟主机(Virtual Host,在RabbitMQ中叫做vhost)是AMQP协议里的一个基本概念,客户端在连接消息服务器时必须指定一个虚拟主机。RabbitMQ中的权限控制是以vhost为单位的。也就是说,消息客户端在访问时不能把vhostA中的交换器绑定到vhostB的队列中,这时RabbitMQ的一个很重要的设计。因此,实际场景中可以用一台RabbitMQ服务器服务多个不同的应用,应用间通过不同虚拟主机的划分提供访问消息时逻辑上的隔离,从而为应用程序提供安全访问。这种方式既能把同一台RabbitMQ服务器的不同业务应用区分开,又可以避免其内部队列、交换器的命名冲突。
RabbitMQ有一个默认的vhost,它的值是 /
,用户名和密码都是guest。当在RabbitMQ集群上创建vhost时,整个集群上的实例都会创建该vhost,vhost不仅消除了为基础架构中的每一层都运行一台RabbitMQ服务器的需要,而且避免了为每一层都创建不同的集群。
可以通过RabbitMQ提供的rabbitmqctl工具管理vhost:
# 创建虚拟主机 test
rabbitmqctl add_vhost test
# 删除虚拟主机 test
rabbitmqctl delete_vhost test
# 查询当前 RabbitMQ服务器中的所有虚拟主机
rabbitmqctl list_vhosts
消息保存
RabbitMQ对于Queue中消息的保存方式有disk和RAM两种。
disk就是写入磁盘,也就是通常所理解的持久化,这种方式的好处是当发生系统意外宕机的情况时,原来的消息数据可以在系统启动之后恢复。根据官网介绍,RabbitMQ在两种情况下会将消息数据写入磁盘,一是在发布消息时指明需要写入磁盘;而是消息服务器内存紧张时会将部分内存中的消息转移到磁盘。采用disk方式,消息数据会被保存在以 .rdq
后缀命名的文件中,当文件达到一定大小(默认是16777216字节,即16MB)时会生成一个新的文件,当文件中的已经被删除的消息比例大于阀值时会触发文件合并操作,以提高磁盘利用率。
而采用RAM方式,只是在RAM中保存内部数据库表数据,而不会保存消息、消息存储索引、队列索引和其他节点状态等数据,所以必须在启动时从集群中的其他节点同步原来的消息数据,这也就意味着集群中必须包含至少一个disk方式的节点。正因为这样,所以不能手工删除集群中的最后一个disk节点。
在绝大部分情况下,对消息相关数据的保存采用disk方式,如果有其他高可用手段保证,则可以选用RAM方式,以提高消息队列的工作效率。
消息持久化涉及Queue、Message、Exchange三部分。
Queue持久化
Queue持久化是通过设置durable为true来实现的,在Channel接口中声明队列有以下几种方法。
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
exclusive:表示是否为排他队列,为true表示是排他队列,只会对首次声明该队列时使用的连接可见,并在连接断开时自动删除。
autoDelete:表示是否自动删除,为true表示队列会在没有任何订阅的消费者时被自动删除。常见于需要临时队列的场景中。
Message持久化
队列持久化之后,服务器重启队列会继续存在,但并不保证队列里面的消息也继续存在。如果想要队列里的消息也存在,需要对消息设置持久化标识。以下是Channel接口中basicPublish方法的定义。
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props,
byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate,
BasicProperties props, byte[] body) throws IOException;
RabbitMQ在MessageProperties中预定义了BasicProperties类型的常量用于选择,包括MINIMAL_BASIC、MINIMAL_PERSISTENT_BASIC、BASIC、PERSISTENT_BASIC、TEXT_PLAIN、PERSISTENT_TEXT_PLAIN,可以使用PERSISTENT_TEXT_PLAIN表示发送的是需要持久化的消息。在MessageProperties的源码中可以看到,其实设置消息持久化就是把BasicProperties中的deliveryMode设置为2。
Exchange持久化
同样,如果不设置Exchange持久化,则消息服务器重启后Exchange就不存在了,所以一般建议在将消息持久化时也要设置Exchange持久化。方法同样在Channel接口中,在声明Exchange时支持durable入参的方法,将其设置为true即可。
消息持久化的代价
从消息安全性考虑,当然所有消息都被持久化到硬盘最放心了,但这样做是有代价的,最大的代价是影响性能,写入磁盘的速度要比写入内存的速度慢得多,这将极大影响服务器的吞吐量。一般情况下要仔细分析业务场景中使用消息队列时的性能需求,如果使用了持久化机制之后实际的消息服务器吞吐量远达不到目标,这时就需要权衡看时候需要该特性。这里说的仔细分析还包括分析业务场景中的本质需求,假如只是为了能够保证消息投递,其实也可换种思路解决,比如生产者在另外单独的信道上监听消息响应队列,在发送的消息中包含响应队列名称,这样消费者就可以回发应答以确认是否接收到消息,如果在一定时间内没收到回复,生产者则重发消息。
即使对以上三种组件都设置了持久化,也不能保证消息在使用过程中完全不会丢失。例如,如果消息消费者在收到消息是autoAck为true,但消费端还没处理完成服务器就崩溃了,在这种情况下消息数据还是丢失了。在这种场景下需要把autoAck设为false,并在消费逻辑完成之后在手动去确认。其实可靠性是一个很庞大复杂的问题,需要结合具体的使用场景来考虑,这里只是给出在使用消息系统时需要考虑的点供参考。可靠性不仅跟消息持久化机制相关,还有其他问题需要考虑,例如消息确认模式。
消息确认模式
在默认情况下,生产者把消息发送出去之后,Broker是不会返回任何信息给生产者的,也就是说,生产者也不知道消息有没有正确到达Borker。如果在消息到达Broker前发生了宕机,或者当Broker接收到消息但在写入磁盘时发生了宕机,这样消息就丢失了,从而发生了宕机,或者当Broker接收到消息但在写入磁盘时发生了宕机,这样消息就丢失了,而作为消息生产者又不知道。为此RabbitMQ提供了两种解决方式,一是通过AMQP协议中的事务机制;二是把信道设置成确认模式。
AMQP协议中的事务模式与数据库里的事务模式不同,在AMQP中当把信道设置成事务模式后,生产者和Broker之间会有一种发送/响应机制判断当前命令操作是否可以继续。不过,由于事务模式需要生产者应用同步等待Broker的执行结果,在性能上会极大降低消息服务器的吞吐量,解决方案偏重了点,所以一般不建议使用事务模式,二是采用性能更好的发送方确认(Publisher Confirm)模式来保障消息投递。由于没有事务回滚概念,其对Broker的性能影响小了很多。
发送方确认模式是RabbitMQ对AMQP的扩展实现,在2.3.1及更高版本中可用。把信道设置成确认模式之后,在该信道上发布的所有消息都会被分配一个唯一ID,一旦消息被投递到所有匹配队列中,该信道就会向生产者发送确认消息,在确认消息中包含了之前的唯一ID,从而让生产者知道消息已到达目的队列。发送方确认模式的最大优势是异步,生产者发送完一条消息后可继续发送下一条消息,当生产者收到确认消息后调用回调方法处理。
设置确认模式可以调用信道的confirmSelect方法。不过,如果信道已经是事务模式,则不能在设置成确认模式,因为这两种模式是不能并存的。
生产者确认模式有三种途径。
-
普通确认:每发送一条消息后,就调用waitForConfirm方法等待Broker的确认消息,本质上这就是串行方式确认。
-
批量确认:每发送完一批消息后,再调用waitForConfirm方法等待Broker的确认。
-
异步确认:通过调用addConfirmListener方法注册回调,在Broker确认了一条或多条消息之后有客户端回调该方法。
从编程的复杂度来看,普通确认模式最简单,只需要考虑Broker返回false或者方法超过给定的时间未返回,客户端进行重传即可。批量确认模式需要考虑在异常情况下整个批次的消息全部重发,这会带来明显的重复消息,而且如果消息经常丢失的话,这种模式反而不如普通确认模式。异步确认模式实现最复杂,需要为每个信道都去维护一个尚未确认的消息集合,每次发布消息时尚未确认的消息总数加1,执行回调时再减去相应的已经收到确认的消息数量。
不管哪种确认模式,通过调整客户端线程数都可达到比较大的吞吐量,无非是实现的代价不同,比如异步确认模式只需要少量线程即可,而普通确认模式则需要更多的线程。总的来说,还是要根据应用场景和开发人员的水平,最后结合实际压测结果来权衡使用哪种模式。
消费者应答
消息确认模式解决的是发送方确认消息发送到Broker的问题,很多时候消息生产者不止关心消息有没有发送到服务器,还想知道消息消费者的消费是否成功。这就设计RabbitMQ中的另一个概念:消费者回执(Consumer Acknowledgement)。
在实际应用中可能会发生消费者接收到消息,但是还没有处理完就宕机的情况,这将导致消息丢失。为了避免这种情况,可以要求消费者在消费完消息后发送一个回执给RabbitMQ服务器,RabbitMQ收到消息回执后再将该消息从队列中删除,如果没有收到回执并且检测到消费者与RabbitMQ服务器的连接断开了,则由RabbitMQ服务器负责把消息发送给其他消费者(如果有多个消费者的话)进行处理。RabbitMQ里的消息是不会过期的,除非发生消费者与RabbitMQ服务器的连接断开的情况;否则不管消费者执行消息逻辑的时间有多长,都不会让消息被发送给其他消费者。
两种消息回执模式
在AMQP协议中定义了两种消息回执模式,其中一种就是自动回执;另一种是手动回执。在自动回执模式下,当Broker成功发送消息给消费者后就会立即把此消息从队列中删除,而不用等待消费者确认消息。而在手动回执模式下,当Broker发送消息给消费者后并不会立即把此消息删除,而是等收到消费者回送的确认消息后才会删除。因此,在手动回执模式下,当消费者收到消息并处理完成后需要向Broker显式发送ACK(在RabbitMQ中ACK是acknowledgement的简写,中文意思是消息应答或确认)指令,如果消费者因为意外崩溃而没有发送ACK指令,那么Broker就会把该消息转发给其他消费者(如果此时没有其他消费者,则Broker会缓存此消息,直到有新的消费者注册)。
是否开启自动回执模式,由消费消息时调用的basicConsume方法的入参autoAck决定。
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)
throws IOException;
当autoAck为true时表示采用自动回执模式,如果false则表示采用手动回执模式。手动回执模式需要在消费者处理完消息后由程序返回应答状态。
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
throws IOException {
// 确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,它在channel范围内是唯一的。
multiple: 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;如果为 true,则额外将比第一个参数指定的 deliveryTag 小的消息一并确认。
批量确认针对的是整个信道,对同一消息的重复确认,或者对不存在的消息的确认,会产生 IO 异常,导致信道关闭。
拒绝消息
当消费者处理消息失败或者当前不能处理该消息时,可以给Broker发送一个拒绝消息的指令,并且可要求Broker将该消息丢弃或重新放入队列中。拒绝消息有两种方式,一是拒绝一条消息;二是拒绝多条消息。他们都被定义在Channel接口中。
void basicReject(long deliveryTag, boolean requeue) throws IOException;
只支持一条消息进行拒绝。
deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,它在channel范围内是唯一的。
requeue:表示如何处理这条消息,为true表示重新放入RabbitMQ的发送队列中,为false表示通知RabbitMQ销毁该消息。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
一次拒绝多条消息。
deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,它在channel范围内是唯一的。
multiple:批量确认标志,为true表示包含当前消息在内的所有比该消息的deliveryTag值小的消息都被拒绝(除了已经被应答的消息),为false则表示只拒绝本条消息。
requeue:表示如何处理这条消息,为true表示重新放入RabbitMQ的发送队列中,为false表示通知RabbitMQ销毁该消息。
需要注意的是,当队列中只有一个消费者时,需要确认不会因为拒绝消息并选择重新放入队列中而导致消息在同一个消费者上发生死循环。
消息预取
在实际场景中,如果对每条消息的处理时间不同,则可能导致有些消费者一致很忙,而有些消费者处理很快并一直空闲。这时可通过设置预读数量(Prefetch Count)限制每个消费者在收到下一个确认回执前一次最多可以接收多少条消息。例如,设置prefetchCount为1,则表示RabbitMQ服务器每次给每个消费者发送一条消息,在收到该消息的消费者ACK指令之前RabbitMQ不会再向该消费者发送新的消息。可以通过Channel接口中的basicQos方法设置预读取数量。
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount, boolean global) throws IOException;
void basicQos(int prefetchCount) throws IOException;
不要设置无限制的预取数量,这将导致消费者接收所有的消息,耗尽内存并崩溃,然后所有的消息又被重新发送。
流控机制
RabbitMQ可以对内存和磁盘的使用量设置阀值,当达到阀值后生产者将被阻塞,直到对相应资源的使用恢复正常。除设置这两个阀值之外,RabbitMQ还用流控(Flow Control)机制来确保稳定性。由于Erlang进程之间不共享内存(binaries除外),而是通过传递消息来通信的,所以每个进程都有自己的进程邮箱(mailbox)。因为Erlang默认不会对mailbox的大小设限,所以,如果有大量消息持续发往某个进程,将会导致该mailbox过大,最终内存溢出、进程崩溃。在RabbitMQ中如果生产者持续高速发送消息,而消费者消费的速度又低于生产者发送的速度,若没有流控很快就会使内部mailbox达到阀值限制,从而阻塞生产者的操作(因为有Block机制,所以进程不会崩溃),然后RabbitMQ会进行换页操作,把内存中的数据持久化到磁盘上。
为了解决这个问题,RabbitMQ使用了一种基于信用证的流控机制,在消息处理进程内部有一个信用组(InitialCredit,MoreCreditAfter),表示在生产者进程中对应与某个消费者的初始credit和该消费者要返回给生产者的credit。例如,生产者进程A向消费者B发送消息,没法送一条消息都会是A中B的credit减1,如果A中的credit降到0就会被阻塞。B收到消息后会向A做出应答,每次应答都会使A中B的credit增加MoreCreditAfter个,当生产者进程的credit大于0时生产者就可继续向消费者发送消息了。实质上RabbitMQ就是通过监控每个进程的mailbox,当有进程负载过高来不及接收新消息时,该进程的mailbox就会开始堆积消息,当堆积到一定量时就会阻塞上游进程使其不得接收新消息,从而慢慢地上游进程的mailbox也会开始堆积消息,到一定量之后有会阻塞上游的上游进程接收消息,最后就会使得负责网络数据包接收的进程被阻塞,暂停接收数据。这有点像一个多级水库,当下游压力过大时上游水库就得关闭闸门,使得当前水库的压力也越来越大,累计到一定量后需要关闭更上游水库的闸门,直到关闭最上游的那个水库的闸门。
当RabbitMQ服务器出现内存或磁盘等资源的使用量达到所设置的阀值情况时,就会触发流控机制,从而阻塞消息生产端的连接,阻止生产者继续发送消息,直到资源不足的警告解除。触发流控机制后RabbitMQ服务器端接收消息的速率也会变慢,从而是进入队列的消息减少,同事RabbitMQ服务器端的消息推送也受到极大的影响,有测试发现服务器端推送消息的频率会大幅下降,下一次推送的时间有时会等1min、5min甚至30min。所有一旦触发流控机制,就将导致RabbitMQ性能恶化,推送消息也会变得非常缓慢。因此,要做好数据设计使发送速率和接收速率保持平衡,而不至于引起服务器堆积大量消息进而引发流控。
通道
消息客户端和消息服务器之间的通信是双向的,不管是对客户端还是服务器来说,保持他们之间的网络连接是很耗费资源的。为了在不占用大量TCP/IP连接的情况下也能有大量的逻辑连接,AMQP增加了通道(Channel)的概念,RabbitMQ支持并鼓励在一个连接中创建多个通道,因为相对来说创建和销毁通道的代价会小很多。需要提醒的是,作为经验法则,应该尽量避免在线程之间共享通道,你的应用应该使用每个线程单独的通道,而不是在多个线程上共享同一个通道,因为大多数客户端不会让通道线程安全(因为这将对性能产生严重的负面影响)。
总结
RabbitMQ最大的优势在于提供了比较灵活的消息路由策略、高可用性、可靠性,以及丰富的插件、多种平台支持和完善的文档。不过由于AMQP协议本身导致它的实现比较重量,从而使得其他MQ(比如Kafka)对比其吞吐量处于下风。在选择MQ时关键还是看需求——是更看重消息的吞吐量、消息堆积能力还是消息路由的灵活性、高可用性、可靠性等方面,先确定场景,再对不同产品进行有针对性的测试和分析,最终得到的结论才能作为技术选型的依据。
这个是使用github搭建博客所需使用的文章模板,新建文章都可以在此基础上复制并修改。并且博客文章的名字也必须是 yyyy-mm-dd-文章标题 (如:2019-11-01-博客文章模板)