必威体育Betway必威体育官网
当前位置:首页 > IT技术

ActiveMQ消息中间件使用

时间:2019-10-25 08:15:45来源:IT技术作者:seo实验室小编阅读:58次「手机版」
 

activemq

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

  • jms介绍

JMS即java消息服务(Java message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。

  • ActivcMQ在linux下的安装

1.解压ActiveMQ安装包 tar -xvf apache-activemq-5.12.0-bin.tar.gz -C /usr/local/

2.进入bin目录,运行activemq  ./activemq start

3.到页面访问  ip+8161端口号,点击下图箭头所指位置进入管理页面,默认用户名密码为admin

补充: 如果页面无法访问可能存在如下情况:

1.主机名没有映射,8161端口没有被监听

netstat -tnlp查看端口监听

若没有监听查看主机名是否映射

(1)vim /etc/sysconfig/network

(2)vim  /etc/hosts

若没有则添加映射,重启activemq。

2.8161端口正常监听,页面仍然无法访问,检查防火墙是否关闭。

  • activemq使用java代码实现

对于消息的传递有两种类型,1点对点(一个provider一个consumer),2.发布/订阅(一个消息可被多个consumer消费,注意:consumer端需在provider端前面开启,否则无法接收到发送的消息)

  • 使用jms实现消息的发送与接收

在maven工程的pom文件中加入activemq依赖

<dependency>

           <groupId>org.apache.activemq</groupId>

           <artifactId>activemq-client</artifactId>

           <version>5.13.4</version>

       </dependency>

1.点对点(queue)java代码实现

producer

public static void main(String[] args) throws JMSException {
		//1.创建练连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.219.128:61616");
		//2.创建连接
		Connection connection = connectionFactory.createConnection();
		//3.启动连接
		connection.start();
		//4.获取会话(参数1:是否开启事务,参数2:消息确认方式)
		session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5.创建消息队列
		Queue queue = session.createQueue("myQueue");
		//6.创建消息生产者对象
		MessageProducer producer = session.createProducer(queue);
		//7.创建消息对象
		TextMessage message = session.createTextMessage("日你仙人板板");
		//8.发送消息
		producer.send(message);
		//9.关闭资源
		producer.close();
		session.close();
		connection.close();
	}

consumer端

public static void main(String[] args) throws JMSException, IOException {
		//1.创建连接工厂
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.219.128:61616");
		//2.创建连接
		Connection connection = connectionFactory.createConnection();
		//3.启动连接
		connection.start();
		//4.获取session(会话对象)  参数1:是否启动事务  参数2:消息确认方式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//5.创建队列对象
		Queue queue = session.createQueue("myQueue");
		//6.创建消息消费者对象
		MessageConsumer consumer = session.createConsumer(queue);
		//7.设置监听
		consumer.setMessageListener(new MessageListener() {
			
			public void onMessage(Message message) {
				TextMessage textMessage=(TextMessage)message;
				try {
					System.out.println("提取消息为"+textMessage.getText());
				} catch (JMSException e) {
					e.printstacktrace();
				}
			}  		});
		//8.等待键盘输入
		System.in.read();
		
		//9.关闭资源
		consumer.close();
		session.close();
		connection.close();

	}

实现效果

Number Of Pending Messages  为未消费数量,Number Of Consumers  为consumer连接数,Messages Enqueued  为发送消息数量, Messages Dequeued  为接收消息数量,views中的atom查看消息详情。queue模式中发送消息数量=接收消息数量+未消费数量。

2.发布/订阅(topic)java代码实现

producer端

public static void main(String[] args) throws JMSException {
		// 1.创建练连接工厂
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.219.128:61616");
		// 2.创建连接
		Connection connection = connectionFactory.createConnection();
		// 3.启动连接
		connection.start();
		// 4.获取会话(参数1:是否开启事务,参数2:消息确认方式)
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 5.创建消息队列
		Topic topic = session.createTopic("myTopic");
		// 6.创建消息生产者对象
		MessageProducer producer = session.createProducer(topic);
		// 7.创建消息对象
		TextMessage message = session.createTextMessage("日你仙人板板");
		// 8.发送消息
		producer.send(message);
		// 9.关闭资源
		producer.close();
		session.close();
		connection.close();
	}

consumer端

public static void main(String[] args) throws JMSException, IOException {
	//1.创建连接工厂
			ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.219.128:61616");
			//2.创建连接
			Connection connection = connectionFactory.createConnection();
			//3.启动连接
			connection.start();
			//4.获取session(会话对象)  参数1:是否启动事务  参数2:消息确认方式
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			//5.创建队列对象
			Topic topic = session.createTopic("myTopic");
			//6.创建消息消费者对象
			MessageConsumer consumer = session.createConsumer(topic);
			//7.设置监听
			consumer.setMessageListener(new MessageListener() {
				
				public void onMessage(Message message) {
					TextMessage textMessage=(TextMessage)message;
					try {
						System.out.println("提取消息为"+textMessage.getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}  		});
			//8.等待键盘输入
			System.in.read();
			
			//9.关闭资源
			consumer.close();
			session.close();
			connection.close();
}

与queue模式相比,代码中只需要修改创建队列对象,将createQueue改为createTopic。

  • 使用jms与Spring整合实现activemq

添加依赖


   <properties>
  	<spring.version>4.2.4.RElease</spring.version>
  </properties>
  
  <dependencies>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-jms</artifactId>
		<version>${spring.version}</version>
	</dependency>
  	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-test</artifactId>
		<version>${spring.version}</version>
	</dependency>
	<dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>4.9</version>
	</dependency>
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-client</artifactId>
		<version>5.13.4</version>
	 </dependency>
  </dependencies>  
  • producer端配置文件

APPlicationcontext-producer

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans   
		http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context   
		http://www.springframework.org/schema/context/spring-context.xsd">
		
		
	<context:component-scan base-package="com.semptian.activemq"></context:component-scan>     
	
	   
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
	    <property name="brokerURL" value="tcp://192.168.219.128:61616"/>  
	</bean>
	   
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
	    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
	</bean>  
		   
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
	<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
	    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
	    <property name="connectionFactory" ref="connectionFactory"/>  
	</bean>      
    <!--这个是队列目的地,点对点的  文本信息-->  
	<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
	    <constructor-arg value="myQueue2"/>  
	</bean>    
	
	<!--这个是订阅模式  文本信息-->  
	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">  
	    <constructor-arg value="myTopic2"/>  
	</bean>  
	
</beans>
  • producer端java代码:

queue模式

package com.semptian.activemq;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component
public class QueueProducer {

	@Autowired
	private JmsTemplate jmsTemplate;
	
	@Autowired
	private Destination queueTextDestination;
	
	/**
	 * 发送文本消息
	 */
	public void sendTextMessage(final String text){
		jmsTemplate.send(queueTextDestination, new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				
				return session.createTextMessage(text);
			}
		});
		
	}
	
}

topic模式

package com.semptian.activemq;

import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

@Component
public class TopicProducer {

	@Autowired
	private JmsTemplate jmsTemplate;
	
	@Autowired
	private Destination topicTextDestination;
	
	/**
	 * 发送文本消息
	 * @param text
	 */
	public void sendTextMessage(final String text){
		jmsTemplate.send(topicTextDestination, new MessageCreator() {
			
			public Message createMessage(Session session) throws JMSException {
				
				return session.createTextMessage(text);
			}
		});
		
	}
	
}

consumer端配置文件

(1)applicationContext-queue

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans   
		http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context   
		http://www.springframework.org/schema/context/spring-context.xsd">
	
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
	    <property name="brokerURL" value="tcp://192.168.219.128:61616"/>  
	</bean>
	   
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
	    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
	</bean>  
	
    <!--这个是队列目的地,点对点的  文本信息-->  
	<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
	    <constructor-arg value="myQueue2"/>  
	</bean>    
	
	<!-- 我的监听类 -->
	<bean id="myMessageListener" class="com.semptian.activemq.MyMessageListener"></bean>
	
	
	<!-- 消息监听容器 -->
	<bean class="org.springframework.jms.listener.DefaultMessageListenercontainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="queueTextDestination" />
		<property name="messageListener" ref="myMessageListener" />
	</bean>
	
</beans>

(2)applicationContext-topic

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
	xmlns:jms="http://www.springframework.org/schema/jms"
	xsi:schemaLocation="http://www.springframework.org/schema/beans   
		http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/context   
		http://www.springframework.org/schema/context/spring-context.xsd">
	
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
	<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
	    <property name="brokerURL" value="tcp://192.168.219.128:61616"/>  
	</bean>
	   
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
	<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
	<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
	    <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
	</bean>  
	
    <!--这个是队列目的地,点对点的  文本信息-->  
	<bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">  
	    <constructor-arg value="myTopic2"/>  
	</bean>    
	
	<!-- 我的监听类 -->
	<bean id="myMessageListener" class="com.semptian.activemq.MyMessageListener"></bean>
	
	
	<!-- 消息监听容器 -->
	<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
		<property name="connectionFactory" ref="connectionFactory" />
		<property name="destination" ref="topicTextDestination" />
		<property name="messageListener" ref="myMessageListener" />
	</bean>
	
</beans>

consumer端java代码

package com.semptian.activemq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class MyMessageListener implements MessageListener {

	public void onMessage(Message message) {
		
		TextMessage textMessage=(TextMessage)message;
		try {
			System.out.println("接收到消息:"+textMessage.getText());
		} catch (JMSException e) {
		
			e.printStackTrace();
		}
		

	}

}
  • 最后测试代码
  • queue模式

producer端

package com.semptian.activemq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.semptian.activemq.QueueProducer;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:applicationContext-producer.xml")
public class TestQueue {

	@Autowired
	private  QueueProducer queueProducer;
	
	@Test
	public void testSend(){
		queueProducer.sendTextMessage("spring JMS 点对点");
		
	}
	
}

consumer端

package com.semptian.activemq;

import java.io.IOException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:applicationContext-queue.xml")
public class TestQueue {

	@Test
	public void testQueue(){
		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
}

界面效果

  •  topic模式

producer端

package com.semptian.activemq;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.semptian.activemq.QueueProducer;
import com.semptian.activemq.TopicProducer;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:applicationContext-producer.xml")
public class TestTopic {

	@Autowired
	private  TopicProducer topicProducer;
	
	@Test
	public void testSend(){
		topicProducer.sendTextMessage("spring JMS 发布订阅");
		
	}
	
}

consumer端

package com.semptian.activemq;

import java.io.IOException;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:applicationContext-topic.xml")
public class TestTopic {

	@Test
	public void testTopic(){
		try {
			System.in.read();
		} catch (IOException e) {
			
			e.printStackTrace();
		}
	}
	
}

界面效果(注意:topic模式中consumer端需要比producer端先启动)

文章最后发布于: 2018-09-18 00:49:08

相关阅读

迅雷高速通道怎么破解使用?

小编经常在逛论坛的时候经常看到一些网友发布的破解版的迅雷,像VIP6、去广告、使用高速通道、等只有会员的功能。不过经常是用了几

如何说服你的同事使用TDD

http://bridgeforyou.cn/2017/12/03/How-to-Persuade-Your-Teemmate-to-use-TDD/TDD(Test-driven development),也就是我们常说的“

DOM中cloneNode的使用之旅

2019独角兽企业

java.sql.ResultSetMetaData 接口的使用(结果集元数据

intro 有时需要了解结果集的元数据信息(之后与反射,泛型结合,编写通用方法,减少代码重复):columnCount 结果集的列数columnName 列

excel如何使用month函数

excel函数中,MONTH函数,用于返回指定日期中的月份,应该怎么使用呢?下面就跟seo实验室小编一起来看看吧。excel使用month函数的步骤在

分享到:

栏目导航

推荐阅读

热门阅读