消息中间件
本文参考 https://www.imooc.com/learn/856 所做读书笔记
1 课程介绍
1.1 课程安排
为什么需要使用消息中间件 JMS规范
消息中间件概述 JMS代码演练
activemq集群配置 使用其他消息中间件
消息中间件在大型系统中的最佳实践
1.2 为什么需要使用消息中间件
通过服务调用让其他系统感知事件发生
通过消息中间件解耦服务调用
消息中间件的好处
1、解耦 2、异步 3、横向扩展
4、安全可靠 5、顺序保证 …..
2.消息中间件的概述
2.1 消息中间件的概述
什么是中间件?
非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。
什么是消息中间件?
关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统
消息中间件图示
什么是JMS?
java消息服务(Java message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
什么是AMQP?
AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
JMS和AMQP对比
常见消息中间件对比
ActiveMQ
ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
ActiveMQ特性
1、多种语言和协议编写客户端。语言:
Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议:
OpenWire,Stomp REST,WS notification,XMPP,AMQP
2、完全支持JMS1.1和J2EE 1.4规范(持久化,XA消息,事务)
3、虚拟主题、组合目的、镜像队列
RabbitMQ
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ特性
1、支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等
2、AMQP的完整实现(vhost、Exchange、Binding、Routing key等)
3、事务支持/发布确认
4、消息持久化
Kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。
Kafka特性
1、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
2、高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
3、Partition、consumer Group
综合评价
3. JMS规范
3.1 JMS规范
JMS相关概念
1、提供者:实现JMS规范的消息中间件服务器
2、客户端:发送或接收消息的应用程序
3、生产者/发布者:创建并发送消息的客户端
4、消费者/订阅者:接收并处理消息的客户端
5、消息:应用程序之间传递的数据内容
6、消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式
队列模型
1、客户端包括生产者和消费者
2、队列中的消息只能被一个消费者消费
3、消费者可以随时消费队列中的消息
队列模型示意图
主题模型
1、客户端包括发布者和订阅者
2、主题中的消息被所有订阅者消费
3、消费者不能消费订阅之前就发送到主题中的消息
主题模型示意图
JMS规范
JMS编码接口
1、ConnectionFactory用于创建连接到消息中间件的连接工厂
2、Connection代表了应用程序和消息服务器之间的通信链路
3、Destination指消息发布和接收的地点,包括队列或主题
4、session表示一个单线程的上下文,用于发送和接收消息
5、MessageConsumer由会话创建,用于接收发送到目标的消息
6、Messageproducer由会话创建,用于发送消息到目标
7、Message是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体
JMS编码接口之间的关系
4.使用activemq
4.1win安装activemq
1、下载安装包
http://activemq.apache.org/
2、直接启动
bin目录下,以管理员身份运行activemq.BAT,打开浏览器:http://localhost:8161/,默认用户名密码均为admin
3、使用服务启动
bin目录下,以管理员身份运行InstallService.bat
4.2 linux安装activemq
1、下载并解压安装包
wget http://www.apache.org/dyn/closer.cgi?filename=/activemq/5.15.3/apache-activemq-5.15.3-bin.tar.gz
tar -zxvf apache-activemq-5.15.3-bin.tar.gz
2、启动
在bin目录下
./activemq start
查看是否启动
ps -ef | grep activemq
关闭服务
./activemq stop
4.3 队列模式的消息演示
生产者代码
packagecom.imooc.jms.queue;
importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
/**
*CreatedbyAdMinistratoron2018/4/25/025.
*/
publicclassAPPProducer{
privatestaticfinalStringurl="tcp://localhost:61616";
privatestaticfinalStringqueueName="queue-test";
publicstaticvoidmain(String[]args)throwsJMSException{
//1.创建ConnectionFactory
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);
//2.创建Connection
Connectionconnection=connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destinationdestination=session.createQueue(queueName);
//6.创建一个生产者
MessageProducerproducer=session.createProducer(destination);
for(inti=0;i<100;i++){
//7.创建消息
TextMessagetextMessage=session.createTextMessage("test"+i);
//8.发布消息
producer.send(textMessage);
System.out.println("发送消息"+textMessage.getText());
}
//9.关闭连接
connection.close();
}
}
消费者代码
packagecom.imooc.jms.queue;
importcom.sun.org.apache.xpath.internal.SourceTree;
importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
/**
*CreatedbyAdministratoron2018/4/25/025.
*/
publicclassAppConsumer{
privatestaticfinalStringurl="tcp://localhost:61616";
privatestaticfinalStringqueueName="queue-test";
publicstaticvoidmain(String[]args)throwsJMSException{
//1.创建ConnectionFactory
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);
//2.创建Connection
Connectionconnection=connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destinationdestination=session.createQueue(queueName);
//6.创建一个消费者
MessageConsumerconsumer=session.createConsumer(destination);
//7.创建一个监听器,监听状态下不能关闭连接
consumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemessage){
TextMessagetextMessage=(TextMessage)message;
try{
System.out.println("接收消息"+textMessage.getText());
}catch(JMSExceptione){
e.printstacktrace();
}
}
});
//8.关闭连接
//connection.close();
}
}
演示:启动两个消费者后,启动生产者,分别接收到奇偶数据,平均消费队列中的所有消息;
4.4主题模式的消息演示
切到topic窗口下查看消息
生产者
packagecom.imooc.jms.topic;
importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
/**
*CreatedbyAdministratoron2018/4/25/025.
*/
publicclassAppProducer{
privatestaticfinalStringurl="tcp://localhost:61616";
privatestaticfinalStringtopicName="topic-test";
publicstaticvoidmain(String[]args)throwsJMSException{
//1.创建ConnectionFactory
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);
//2.创建Connection
Connectionconnection=connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destinationdestination=session.createTopic(topicName);
//6.创建一个生产者
MessageProducerproducer=session.createProducer(destination);
for(inti=0;i<100;i++){
//7.创建消息
TextMessagetextMessage=session.createTextMessage("test"+i);
//8.发布消息
producer.send(textMessage);
System.out.println("发送消息"+textMessage.getText());
}
//9.关闭连接
connection.close();
}
}
消费者
packagecom.imooc.jms.topic;
importorg.apache.activemq.ActiveMQConnectionFactory;
importjavax.jms.*;
/**
*CreatedbyAdministratoron2018/4/25/025.
*/
publicclassAppConsumer{
privatestaticfinalStringurl="tcp://localhost:61616";
privatestaticfinalStringtopicName="topic-test";
publicstaticvoidmain(String[]args)throwsJMSException{
//1.创建ConnectionFactory
ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);
//2.创建Connection
Connectionconnection=connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.创建一个目标
Destinationdestination=session.createTopic(topicName);
//6.创建一个消费者
MessageConsumerconsumer=session.createConsumer(destination);
//7.创建一个监听器,监听状态下不能关闭连接
consumer.setMessageListener(newMessageListener(){
publicvoidonMessage(Messagemessage){
TextMessagetextMessage=(TextMessage)message;
try{
//需要提前预订,不能接收之前的消息
System.out.println("接收消息"+textMessage.getText());
}catch(JMSExceptione){
e.printStackTrace();
}
}
});
//8.关闭连接
//connection.close();
}
}
主题模式下每个消费者都能拿到所有的消息
4.5 Spring JMS理论
JMS代码演练
使用Spring集成JMS连接ActiveMQ
1、ConnectionFactory用于管理连接的连接工厂
2、JmsTemplate用于发送和接收消息的模板类
3、MessageListener消息监听器
ConnectionFactory
1、一个Spring为我们提供的连接池
2、JmsTemplate每次发送消息都会重新创建连接,会话和productor
3、Spring中提供了SingleConnectionFactory和CachingConnectionFactory
JmsTemplate
1、是Spring提供的,只需向Spring容器内注册这个类就可以使用JmsTemplate方便的操作jms
2、JmsTemplat类是线程安全的,可以在整个应用范围使用。
MessageListener
1、实现一个onMessage方法,该方法只接收一个Message参数。
相关阅读
情景引入 小白:起床起床起床起床。。。。快起床~ 我:怎么了又,大惊小怪,吓到我了。 小白:我有事有事想找你,十万火急呢~~ 我:你能有什么
一、消息中间件相关知识 1、概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控