胡豆秆

个人博客

欢迎来到我的个人博客~


消息队列


消息队列

系统间通信技术介绍

分布式应用

​ 一般来说,大型应用通常会被拆分成多个子系统,这些子系统可能会部署在多台机器上,也可能只是一台机器的多个进程中,这样的应用就是分布式应用。

如果是一个业务被拆分成多个子业务部署在不同服务器上,那就是分布式应用;如果是同一个业务部署在多台服务器上,那就是集群。

​ 分布式应用的子系统之间并不是完全独立的,他们需要相互通信来共同完成某个功能,这就涉及系统间通信了。

系统间通信

​ 目前业界通常有两种方式来实现系统间通信,其中一种是基于远程过程调用的方式(RPC);另一种是基于消息队列的方式。

RPC

​ RPC(Remote Procedure Call)是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。客户端不需要知道调用的具体实现细节,只需直接调用实际存在于远程计算机上的某个对象即可,但调用方式看起来和调用本地应用程序中的对象一样。

  • 它是协议,是一种规范。典型的RPC实现包括Dubbo、Thrift、GRPC等。
  • 网络通信的实现是透明的。调用方不需要关系网络之间的通信协议、网络I/O模型、通信的信息格式等。
  • 跨语言。对于调用方来说,无论其使用的是何种程序语言,调用都应该成功。

消息队列

​ 基于消息队列的方式指由应用中的某个系统负责发送信息,由关心这条消息的相应系统负责接收消息,并在收到消息后进行各自系统内的业务处理。消息可以非常简单,比如只包含文本字符串;也可以很复杂,比如包含字节流、字节数组,还可能包含嵌入对象,甚至是Java对象。

​ 消息在被发送后可以立即返回,由消息队列来负责消息的传递,消息发布者只管将消息发布到消息队列而不用管谁来取,消息使用者只管从消息队列中取消息而不管是谁发布的,这样发布者和使用者都不用知道对方的存在。

为何要用消息队列

​ 消息队列(MQ)是一种系统间相互协作的通信机制。使用消息队列的典型场景是异步处理。消息队列还可用于解决解耦、流量削峰、日志收集、事务最终一致性等问题。

解耦

​ 在大型系统的开发过程中,随着需求的叠加,各模块之间逐渐变成了相互调用的关系,这种模块间紧密关联的关系就是紧耦合。紧耦合带来的问题是对一个模块的功能变更将导致其关联模块发生变化,因此各个模块难以独立演化。

​ 要解决这个问题,可以在模块之间调用时增加一个中间层来实现解耦,这也方便了以后的扩展。所谓解耦,简单地讲,就是一个模块只关心自己的核心流程,而依赖该模块执行结果的其他模块如果做的不是很重要的事情,有通知即可,无须等待结果。换句话说,基于消息队列的模型,关心的是通知,而非处理。

流量削峰

​ 在互联网行业中,可能会出现在某一时刻网站突然迎来用户请求高峰期的情况(如天猫双11、京东618等)。在网站初期设计中,可能就直接将用户的请求数据写入数据库,但如果一直延续这样的设计,当遇到高并发的场景时将会给数据库带来巨大压力,并发访问量大到超过了原先系统的承载能力,会使系统的响应延迟加剧。如果在设计上考虑不周甚至会发生雪崩(由于基础服务不可用造成整个系统大面积不可用的情况),从而发生整个系统不可用的严重生产事故。

​ 对于很多公司来说,突发流量状况其实并不常见,如果都以能处理这类流量削峰值为标准投入大量资源随时待命无疑是很大的浪费。在业界的诸多时实践中,常见的是使用消息队列,先将短时间高并发的请求持久化,然后逐步处理,从而削平高峰期的并发流量,改善系统的性能。

日志收集

​ 在项目开发和运维中日志是一个非常重要的部分,通过日志可以追踪调试信息、定位问题等。但随着业务的发展,要建设的系统越来越多,系统的功能也越来越多,每天产生的日志数量变得越来越大。通过分析海量的日志来实时获取每个系统当前的状态,变得越来越迫切,离线分析当前系统的功能为未来的设计和扩展提供参考,也变得越来越重要。

​ 在这种情况下,利用消息队列产品在接收和持久化消息方面的高性能,引入消息队列快速接收日志消息,避免因为写入日志时的某些故障导致业务系统访问阻塞、请求延迟等。

事务最终一致性

​ 很多框架都提供了事务管理功能,可能都不需要单独写与事务管控相关的代码(比如开启事务、结束事务、提交事务之类)。但随着系统规模的不断增大,模块之间都单独有一套数据库,此时通过数据库提供的事务处理方式则无法解决问题。

​ 在业界的很多实践方案中,都可以借助消息队列来处理此问题。

消息队列的功能特点

​ 消息队列包含了消息和队列。消息是指在应用间传送的数据,队列暂存和处理消息的容器。以此,一个消息队列至少需要包括消息发送、接收和暂存功能。

  • Broker:消息处理中心,负责消息的接收、储存、转发等。
  • Producer:消息生产者,负责产生和发送消息到消息中心。
  • Consumer:消息消费者,负责从消息处理中心获取消息,并进行相应的处理。

​ 但在生产环境应用中,对消息队列的要求远不止基本的消息发送、接收和暂存。在不同的业务场景中,需要消息队列产品能解决诸如消息堆积、消息持久化、可靠投递、消息重复、严格有序、集群等各种问题。

消息堆积

​ 由于消息生产者和消费者是两个分开处理消息的系统,所以无法预支两者对消息处理速度的快慢。一旦在某个时间段消费者处理消息的速度没有跟上生产者发送消息的速度,必将导致消息在处理中心逐渐积压而得不到释放。

​ 有时需要消息队列能够处理这种情况,比如给消息队列设置一个阀值,将超过该阀值的消息不在放入消息处理中心,以防止系统资源被耗尽,导致系统挂掉甚至这个消息队列不可用。

消息持久化

​ 一个消息队列,如果消息在到达处理中心后不做任何处理就转发给消费者,那么消息处理中心也就失去了存在的意义,无法满足流量削峰等需求。常规做法是现将消息暂存下来,然后选择合适的时机再将消息投递给消费者。

​ 消息暂存可以选择将消息放在内存中,也可以选择放到文件、数据库等地方。若将消息暂存在内存中,一旦机器宕机,消息就会丢失。如果在某些情况下,该消息不能丢失,那就需要将消息持久化。如将消息存放到本地文件、分布式文件系统、数据库系统等。

可靠投递

可靠投递是不允许存在消息丢失的。从消息的整个生命周期来看,消息丢失一般发生在以下三个过程中:

  • 从生产者到消息处理中心。
  • 从消息处理中心到消息消费者。
  • 消息处理中心持久化消息。

​ 由于跨越不同的系统,中间会碰到网络问题或系统宕机等各种不确定的情形。总之对于消息生产者来说,就是消息没有送达。

消息重复

​ 有些消息队列为支持消息可靠投递,会选择在接收到消息后先持久化到本地,然后发送给消费者。当消息发送失败或者不知到是否发送成功时(如超时),消息的状态是待发送,定时任务不停地轮询所有待发送的消息,最终保证消息不会丢失,这就带来了消息可能会重复的问题。

严格有序

​ 在实际的业务场景中,经常会碰到需要按生产消息时的顺序来消费的情形。如网购时的创建订单、支付完成、已发货、已收货、订单完成等,每个环节都有可能产生消息,但会要求严格按照顺序消费消息,否则在业务处理上就是不正确的。这就需要消息队列能够提供有序消息的保证。但顺序消费却不一定需要消息在整个产品中全局有序,有的产品可能只需要提供局部有序的保证。

集群

​ 在大型应用中,系统架构一般都需要实现高可用性,以排除单点故障引起的服务中断。所以可能需要消息队列提供对集群模式的支持。集群不仅可以让消费者和生产者在某个节点崩溃的情况下继续运行,集群之间的多个节点还能够共享负载,当某台机器或网络出现故障时能自动进行负载均衡,而且可以通过增加更多的节点来提高消息通信的吞吐量。

消息中间件

​ 非底层操作系统软件、非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。中间件是一种独立的系统软件或服务程序,分布式应用系统借助这种软件在不同的技术之间共享资源,管理计算机资源和网络通信。中间件在计算机系统中是一个关键软件,它能实现应用的互联和互操作,能保证系统安全、可靠、高效运行。中间件位于用户应用和操作系统及网络软件之间,他为应用提供了公用的通信手段,并且独立于网络和操作系统。

​ IBM消息中间件MQ以其独特的安全机制,简便、快速的编程风格,卓越不凡的稳定性、可扩展性和跨平台性,以及强大的事务处理能力和消息通信能力,成为市场占有率最高的消息中间产品。

设计一个简单的消息队列

在消息队列的完整使用场景中至少需要包含三个角色。

  • 消息处理中心:负责消息的接收、储存、转发等。
  • 消息生产者:负责生产和发送消息到消息处理中心。
  • 消息消费者:负责从消息处理中心获取消息,并进行相应的处理。

可以看到,消息队列服务的核心是消息处理中心,它至少要具备消息发送、消息接收和消息暂存功能。

消息处理中心

创建项目,新建一个消息处理中心类 Broker.java

public class Broker {
	// 队列储存消息的最大数量
	private final static int MAX_SIZE = 3;
	// 保存消息数据的容器
	private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<String>(MAX_SIZE);
	// 生产消息
	public static void produce(String msg) {
		if(messageQueue.offer(msg)) {
			System.out.println("成功投递消息:" + msg + ",当前消息数量:" + messageQueue.size());
		} else {
			System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
		}
		System.out.println("------------------------------------------------------------");
	}
	// 消费消息
	public static String consume() {
		String msg = messageQueue.poll();
		if(msg != null) {
			// 消费条件满足情况,从消息容器中取出一条消息
			System.out.println("消费消息:" + msg + ",当前消息数量:" + messageQueue.size());
		} else {
			System.out.println("消息处理中心中没有消息!");
		}
		System.out.println("------------------------------------------------------------");
		return msg;
	}
}

有了消息处理中心类之后,需要将该类的功能暴露出去,这样别人才能用它来发送和接收消息。

定义BrokerServer类用来对外提供Broker类的服务。

public class BrokerServer implements Runnable {
	public static int SERVER_PORT = 9999;
	private final Socket socket;
	public BrokerServer(Socket socket) {
		this.socket = socket;
	}
	public static void main(String[] args) throws IOException {
		ServerSocket serverSocket = new ServerSocket(SERVER_PORT);
		while(true) {
			BrokerServer server = new BrokerServer(serverSocket.accept());
			new Thread(server).start();
		}
	}
	public void run() {
		try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
				PrintWriter out = new PrintWriter(socket.getOutputStream())) {
			while (true) {
				String str = in.readLine();
				if (str == null) {
					continue;
				}
				System.out.println("收到数据:" + str);
				// CONSUME表示消费一条消息
				if (str.equals("CONSUME")) {
					String message = Broker.consume();
					out.println(message);
					out.flush();
				} else {
					// 其他情况都是生产消息
					Broker.produce(str);
				}
			}
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

执行BrokerServer类的main方法,应用程序即可作为一个消息队列对外提供服务。

客户端访问

有了消息处理中心后,自然需要有相应客户端与之通信来发送和接收消息。

public class MQClient {
	public static void produce(String message) throws UnknownHostException, IOException {
		Socket socket = new Socket(InetAddress.getLocalHost(), BrokerServer.SERVER_PORT);
		try (PrintWriter out = new PrintWriter(socket.getOutputStream())) {
			out.println(message);
			out.flush();
		} catch (Exception e) {
			// TODO: handle exception
		}
	}
	public static String consume() throws UnknownHostException, IOException {
		Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVER_PORT);
		try(
			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			PrintWriter out = new PrintWriter(socket.getOutputStream())
		) {
			// 先向消息队列发送字符串 “CONSUME” 表示消费
			out.println("CONSUME");
			out.flush();
			// 再从消息队列获取一条消息
			String message = in.readLine();
			return message;
		}
	}
}

测试

新建一个ProduceClient类来生产消息

public class ProduceClient {
	public static void main(String[] args) throws UnknownHostException, IOException {
		MQClient.produce("Hello world");
	}
}

重复执行执行main方法

新建一个ConsumeClient用于消费消息

public class ConsumeClient {
	public static void main(String[] args) throws UnknownHostException, IOException {
		String string = MQClient.consume();
		System.out.println("获取到消息:" + string);
	}
}

重复执行main方法

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦