activemq
- activemq介绍
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
- jms介绍
JMS即java消息服务(Java message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。
- ActivcMQ在linux下的安装
1.解压ActiveMQ安装包 tar -xvf apache-activemq-5.12.0-bin.tar.gz -C /usr/local/
2.进入bin目录,运行activemq ./activemq start
3.到页面访问 ip+8161端口号,点击下图箭头所指位置进入管理页面,默认用户名密码为admin
补充: 如果页面无法访问可能存在如下情况:
1.主机名没有映射,8161端口没有被监听
netstat -tnlp查看端口监听
若没有监听查看主机名是否映射
(1)vim /etc/sysconfig/network
(2)vim /etc/hosts
若没有则添加映射,重启activemq。
2.8161端口正常监听,页面仍然无法访问,检查防火墙是否关闭。
- activemq使用java代码实现
对于消息的传递有两种类型,1点对点(一个provider一个consumer),2.发布/订阅(一个消息可被多个consumer消费,注意:consumer端需在provider端前面开启,否则无法接收到发送的消息)
- 使用jms实现消息的发送与接收
在maven工程的pom文件中加入activemq依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
public static void main(String[] args) throws JMSException {
//1.创建练连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.219.128:61616");
//2.创建连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取会话(参数1:是否开启事务,参数2:消息确认方式)
session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建消息队列
Queue queue = session.createQueue("myQueue");
//6.创建消息生产者对象
MessageProducer producer = session.createProducer(queue);
//7.创建消息对象
TextMessage message = session.createTextMessage("日你仙人板板");
//8.发送消息
producer.send(message);
//9.关闭资源
producer.close();
session.close();
connection.close();
}
consumer端
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.219.128:61616");
//2.创建连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("myQueue");
//6.创建消息消费者对象
MessageConsumer consumer = session.createConsumer(queue);
//7.设置监听
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("提取消息为"+textMessage.getText());
} catch (JMSException e) {
e.printstacktrace();
}
} });
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
实现效果
Number Of Pending Messages 为未消费数量,Number Of Consumers 为consumer连接数,Messages Enqueued 为发送消息数量, Messages Dequeued 为接收消息数量,views中的atom查看消息详情。queue模式中发送消息数量=接收消息数量+未消费数量。
2.发布/订阅(topic)java代码实现
producer端
public static void main(String[] args) throws JMSException {
// 1.创建练连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.219.128:61616");
// 2.创建连接
Connection connection = connectionFactory.createConnection();
// 3.启动连接
connection.start();
// 4.获取会话(参数1:是否开启事务,参数2:消息确认方式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.创建消息队列
Topic topic = session.createTopic("myTopic");
// 6.创建消息生产者对象
MessageProducer producer = session.createProducer(topic);
// 7.创建消息对象
TextMessage message = session.createTextMessage("日你仙人板板");
// 8.发送消息
producer.send(message);
// 9.关闭资源
producer.close();
session.close();
connection.close();
}
consumer端
public static void main(String[] args) throws JMSException, IOException {
//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.219.128:61616");
//2.创建连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session(会话对象) 参数1:是否启动事务 参数2:消息确认方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Topic topic = session.createTopic("myTopic");
//6.创建消息消费者对象
MessageConsumer consumer = session.createConsumer(topic);
//7.设置监听
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("提取消息为"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
} });
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();
}
与queue模式相比,代码中只需要修改创建队列对象,将createQueue改为createTopic。
- 使用jms与Spring整合实现activemq
添加依赖
<properties>
<spring.version>4.2.4.RElease</spring.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.13.4</version>
</dependency>
</dependencies>
- producer端配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="com.semptian.activemq"></context:component-scan>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.219.128:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的 文本信息-->
<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="myQueue2"/>
</bean>
<!--这个是订阅模式 文本信息-->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="myTopic2"/>
</bean>
</beans>
- producer端java代码:
queue模式
package com.semptian.activemq;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
@Component
public class QueueProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination queueTextDestination;
/**
* 发送文本消息
*/
public void sendTextMessage(final String text){
jmsTemplate.send(queueTextDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
}
topic模式
package com.semptian.activemq;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
@Component
public class TopicProducer {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination topicTextDestination;
/**
* 发送文本消息
* @param text
*/
public void sendTextMessage(final String text){
jmsTemplate.send(topicTextDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(text);
}
});
}
}
consumer端配置文件
(1)applicationContext-queue
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.219.128:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的 文本信息-->
<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="myQueue2"/>
</bean>
<!-- 我的监听类 -->
<bean id="myMessageListener" class="com.semptian.activemq.MyMessageListener"></bean>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenercontainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueTextDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
(2)applicationContext-topic
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.219.128:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!--这个是队列目的地,点对点的 文本信息-->
<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="myTopic2"/>
</bean>
<!-- 我的监听类 -->
<bean id="myMessageListener" class="com.semptian.activemq.MyMessageListener"></bean>
<!-- 消息监听容器 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicTextDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
consumer端java代码
package com.semptian.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class MyMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println("接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 最后测试代码
- queue模式
producer端
package com.semptian.activemq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.semptian.activemq.QueueProducer;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:applicationContext-producer.xml")
public class TestQueue {
@Autowired
private QueueProducer queueProducer;
@Test
public void testSend(){
queueProducer.sendTextMessage("spring JMS 点对点");
}
}
consumer端
package com.semptian.activemq;
import java.io.IOException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:applicationContext-queue.xml")
public class TestQueue {
@Test
public void testQueue(){
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
界面效果
- topic模式
producer端
package com.semptian.activemq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.semptian.activemq.QueueProducer;
import com.semptian.activemq.TopicProducer;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:applicationContext-producer.xml")
public class TestTopic {
@Autowired
private TopicProducer topicProducer;
@Test
public void testSend(){
topicProducer.sendTextMessage("spring JMS 发布订阅");
}
}
consumer端
package com.semptian.activemq;
import java.io.IOException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:applicationContext-topic.xml")
public class TestTopic {
@Test
public void testTopic(){
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
界面效果(注意:topic模式中consumer端需要比producer端先启动)
文章最后发布于: 2018-09-18 00:49:08
相关阅读
小编经常在逛论坛的时候经常看到一些网友发布的破解版的迅雷,像VIP6、去广告、使用高速通道、等只有会员的功能。不过经常是用了几
http://bridgeforyou.cn/2017/12/03/How-to-Persuade-Your-Teemmate-to-use-TDD/TDD(Test-driven development),也就是我们常说的“
2019独角兽企业
java.sql.ResultSetMetaData 接口的使用(结果集元数据
intro 有时需要了解结果集的元数据信息(之后与反射,泛型结合,编写通用方法,减少代码重复):columnCount 结果集的列数columnName 列
excel函数中,MONTH函数,用于返回指定日期中的月份,应该怎么使用呢?下面就跟seo实验室小编一起来看看吧。excel使用month函数的步骤在