必威体育Betway必威体育官网
当前位置:首页 > IT技术

JAVA消息中间件

时间:2019-08-27 20:40:00来源:IT技术作者:seo实验室小编阅读:71次「手机版」
 

消息中间件

本文参考 https://www.imooc.com/learn/856 所做读书笔记

1 课程介绍

1.1 课程安排

为什么需要使用消息中间件                                                JMS规范

消息中间件概述                                                                      JMS代码演练

activemq集群配置                                                                 使用其他消息中间件

消息中间件在大型系统中的最佳实践

1.2 为什么需要使用消息中间件

通过服务调用让其他系统感知事件发生

通过消息中间件解耦服务调用

消息中间件的好处

1、解耦                                          2、异步                                       3、横向扩展

4、安全可靠                                 5、顺序保证                              …..

2.消息中间件的概述

2.1 消息中间件的概述

什么是中间件?

   非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给客户带来价值的软件统称为中间件。

什么是消息中间件?

关注于数据的发送和接收,利用高效可靠的异步消息传递机制集成分布式系统

消息中间件图示

什么是JMS?

java消息服务(Java message Service)即JMS,是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。

什么是AMQP?

   AMQP(advanced message queuing protocol)是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

JMS和AMQP对比

常见消息中间件对比

ActiveMQ

   ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

ActiveMQ特性

1、多种语言和协议编写客户端。语言:

Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议:

OpenWire,Stomp REST,WS notification,XMPP,AMQP

2、完全支持JMS1.1和J2EE 1.4规范(持久化,XA消息,事务)

3、虚拟主题、组合目的、镜像队列

RabbitMQ

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ特性

1、支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript等

2、AMQP的完整实现(vhost、Exchange、Binding、Routing key等)

3、事务支持/发布确认

4、消息持久化

Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的、分区的、可靠的分布式日志存储服务。它通过一种独一无二的设计提供了一个消息系统的功能。

Kafka特性

1、通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。

2、高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。

3、Partition、consumer Group

综合评价

3. JMS规范

3.1 JMS规范

JMS相关概念

1、提供者:实现JMS规范的消息中间件服务器

2、客户端:发送或接收消息的应用程序

3、生产者/发布者:创建并发送消息的客户端

4、消费者/订阅者:接收并处理消息的客户端

5、消息:应用程序之间传递的数据内容

6、消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式

队列模型

1、客户端包括生产者和消费者

2、队列中的消息只能被一个消费者消费

3、消费者可以随时消费队列中的消息

队列模型示意图

主题模型

1、客户端包括发布者和订阅者

2、主题中的消息被所有订阅者消费

3、消费者不能消费订阅之前就发送到主题中的消息

主题模型示意图

JMS规范

JMS编码接口

1、ConnectionFactory用于创建连接到消息中间件的连接工厂

2、Connection代表了应用程序和消息服务器之间的通信链路

3、Destination指消息发布和接收的地点,包括队列或主题

4、session表示一个单线程的上下文,用于发送和接收消息

5、MessageConsumer由会话创建,用于接收发送到目标的消息

6、Messageproducer由会话创建,用于发送消息到目标

7、Message是在消费者和生产者之间传送的对象,消息头,一组消息属性,一个消息体

JMS编码接口之间的关系

4.使用activemq

4.1win安装activemq

1、下载安装包

   http://activemq.apache.org/

2、直接启动

bin目录下,以管理员身份运行activemq.BAT,打开浏览器:http://localhost:8161/,默认用户名密码均为admin

3、使用服务启动

bin目录下,以管理员身份运行InstallService.bat

4.2 linux安装activemq

1、下载并解压安装包

wget http://www.apache.org/dyn/closer.cgi?filename=/activemq/5.15.3/apache-activemq-5.15.3-bin.tar.gz

tar -zxvf apache-activemq-5.15.3-bin.tar.gz

2、启动

在bin目录下

./activemq start

查看是否启动

ps -ef | grep activemq

关闭服务

./activemq stop

4.3 队列模式的消息演示

生产者代码

packagecom.imooc.jms.queue;

importorg.apache.activemq.ActiveMQConnectionFactory;

importjavax.jms.*;

/**

*CreatedbyAdMinistratoron2018/4/25/025.

*/

publicclassAPPProducer{

privatestaticfinalStringurl="tcp://localhost:61616";

privatestaticfinalStringqueueName="queue-test";

publicstaticvoidmain(String[]args)throwsJMSException{

//1.创建ConnectionFactory

ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);

//2.创建Connection

Connectionconnection=connectionFactory.createConnection();

//3.启动连接

connection.start();

//4.创建会话

Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

//5.创建一个目标

Destinationdestination=session.createQueue(queueName);

//6.创建一个生产者

MessageProducerproducer=session.createProducer(destination);

for(inti=0;i<100;i++){

//7.创建消息

TextMessagetextMessage=session.createTextMessage("test"+i);

//8.发布消息

producer.send(textMessage);

System.out.println("发送消息"+textMessage.getText());

}

//9.关闭连接

connection.close();

}

}

消费者代码

packagecom.imooc.jms.queue;

importcom.sun.org.apache.xpath.internal.SourceTree;

importorg.apache.activemq.ActiveMQConnectionFactory;

importjavax.jms.*;

/**

*CreatedbyAdministratoron2018/4/25/025.

*/

publicclassAppConsumer{

privatestaticfinalStringurl="tcp://localhost:61616";

privatestaticfinalStringqueueName="queue-test";

publicstaticvoidmain(String[]args)throwsJMSException{

//1.创建ConnectionFactory

ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);

//2.创建Connection

Connectionconnection=connectionFactory.createConnection();

//3.启动连接

connection.start();

//4.创建会话

Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

//5.创建一个目标

Destinationdestination=session.createQueue(queueName);

//6.创建一个消费者

MessageConsumerconsumer=session.createConsumer(destination);

//7.创建一个监听器,监听状态下不能关闭连接

consumer.setMessageListener(newMessageListener(){

publicvoidonMessage(Messagemessage){

TextMessagetextMessage=(TextMessage)message;

try{

System.out.println("接收消息"+textMessage.getText());

}catch(JMSExceptione){

e.printstacktrace();

}

}

});

//8.关闭连接

//connection.close();

}

}

演示:启动两个消费者后,启动生产者,分别接收到奇偶数据,平均消费队列中的所有消息;

4.4主题模式的消息演示

切到topic窗口下查看消息

生产者

packagecom.imooc.jms.topic;

importorg.apache.activemq.ActiveMQConnectionFactory;

importjavax.jms.*;

/**

*CreatedbyAdministratoron2018/4/25/025.

*/

publicclassAppProducer{

privatestaticfinalStringurl="tcp://localhost:61616";

privatestaticfinalStringtopicName="topic-test";

publicstaticvoidmain(String[]args)throwsJMSException{

//1.创建ConnectionFactory

ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);

//2.创建Connection

Connectionconnection=connectionFactory.createConnection();

//3.启动连接

connection.start();

//4.创建会话

Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

//5.创建一个目标

Destinationdestination=session.createTopic(topicName);

//6.创建一个生产者

MessageProducerproducer=session.createProducer(destination);

for(inti=0;i<100;i++){

//7.创建消息

TextMessagetextMessage=session.createTextMessage("test"+i);

//8.发布消息

producer.send(textMessage);

System.out.println("发送消息"+textMessage.getText());

}

//9.关闭连接

connection.close();

}

}

消费者

packagecom.imooc.jms.topic;

importorg.apache.activemq.ActiveMQConnectionFactory;

importjavax.jms.*;

/**

*CreatedbyAdministratoron2018/4/25/025.

*/

publicclassAppConsumer{

privatestaticfinalStringurl="tcp://localhost:61616";

privatestaticfinalStringtopicName="topic-test";

publicstaticvoidmain(String[]args)throwsJMSException{

//1.创建ConnectionFactory

ConnectionFactoryconnectionFactory=newActiveMQConnectionFactory(url);

//2.创建Connection

Connectionconnection=connectionFactory.createConnection();

//3.启动连接

connection.start();

//4.创建会话

Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

//5.创建一个目标

Destinationdestination=session.createTopic(topicName);

//6.创建一个消费者

MessageConsumerconsumer=session.createConsumer(destination);

//7.创建一个监听器,监听状态下不能关闭连接

consumer.setMessageListener(newMessageListener(){

publicvoidonMessage(Messagemessage){

TextMessagetextMessage=(TextMessage)message;

try{

//需要提前预订,不能接收之前的消息

System.out.println("接收消息"+textMessage.getText());

}catch(JMSExceptione){

e.printStackTrace();

}

}

});

//8.关闭连接

//connection.close();

}

}

主题模式下每个消费者都能拿到所有的消息

4.5 Spring JMS理论

JMS代码演练

使用Spring集成JMS连接ActiveMQ

1、ConnectionFactory用于管理连接的连接工厂

2、JmsTemplate用于发送和接收消息的模板

3、MessageListener消息监听器

ConnectionFactory

1、一个Spring为我们提供的连接池

2、JmsTemplate每次发送消息都会重新创建连接,会话和productor

3、Spring中提供了SingleConnectionFactory和CachingConnectionFactory

JmsTemplate

1、是Spring提供的,只需向Spring容器内注册这个类就可以使用JmsTemplate方便的操作jms

2、JmsTemplat类是线程安全的,可以在整个应用范围使用。

MessageListener

1、实现一个onMessage方法,该方法只接收一个Message参数。

相关阅读

手把手教你如何玩转消息中间件(ActiveMQ)

情景引入 小白:起床起床起床起床。。。。快起床~ 我:怎么了又,大惊小怪,吓到我了。 小白:我有事有事想找你,十万火急呢~~ 我:你能有什么

消息中间件(一)MQ详解及四大MQ比较

一、消息中间件相关知识 1、概述 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控

分享到:

栏目导航

推荐阅读

热门阅读