必威体育Betway必威体育官网
当前位置:首页 > IT技术

ActiveMQ的topic的简单使用

时间:2019-07-30 00:11:06来源:IT技术作者:seo实验室小编阅读:87次「手机版」
 

topic

上一篇写了queue的简单使用,实际差不多,直接上代码

这个topic就类似与微信公众号的感觉,一个生产者可以对应多个消费者

代码在运行的时候需要先运行一下消费者,相当于你订阅了这个topic(类似于关注了一个微信公众号

,不然接收不到消息

生产者:

package com.test.mq.topic;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.messageproducer;
import javax.jms.session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class produceMq {
	private static final String subject = "test-activemq-queue";
	//连接账号
    private static String userName = "admin";
    //连接密码
    private static String password = "admin";
    //连接地址
    private static String brokerURL = "tcp://localhost:61616";
	public static void main(String[] args) throws JMSException {
		//初始化连接工厂  
		ConnectionFactory connectionFactory = new             
              ActiveMQConnectionFactory(userName,password,brokerURL);
		//获得连接
		Connection conn = connectionFactory.createConnection();
		//创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
		Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
		//创建队列
		Destination dest = session.createTopic("MyTopic");
		//通过session可以创建消息的生产者
		MessageProducer producer = session.createProducer(dest);
		producer.setDeliveryMode(DeliveryMode.persistENT);//持久化
		//启动连接 ,连接的启动位置和queue略有不同,需要把配置配完,在启动连接
		conn.start();
		for (int i=0;i<3;i++) {
			//初始化一个mq消息
			TextMessage message = session.createTextMessage("topic 持久化66" + i);
			//发送消息
			producer.send(message);//
		}
		session.commit();
		session.close();
		//关闭mq连接
		conn.close();
	}
}

消费者:

package com.test.mq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Messageconsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.Topicsubscriber;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerMq {
	private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMq.class);
    private static final String SUBJECT = "test-activemq-queue";
    //连接账号
    private static String userName = "admin";
    //连接密码
    private static String password = "admin";
    //连接地址
    private static String brokerURL = "tcp://localhost:61616";
    public static void main(String[] args) throws JMSException {
    	//初始化ConnectionFactory
    	ConnectionFactory connectionFactory = new 
                          ActiveMQConnectionFactory(userName,password,brokerURL);

        //创建mq连接
        Connection conn = connectionFactory.createConnection();
        //参数名称随便起
        conn.setclientid("c3");
        
        

        //创建会话
        Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

        //通过会话创建目标
        Topic topic = session.createTopic("MyTopic");
        //第二个参数随便起
        TopicSubscriber subscriber = session.createDurableSubscriber(topic, "t3");//持久 
         化消息接收
        //启动连接 
        conn.start();
        //创建mq消息的消费者
        Message msg = subscriber.receive();
        while(msg!=null){
        	onMessage(msg);
        	msg=subscriber.receive(1000L);
//        	session.commit();
//        	message.acknowledge();
        	
        }
        session.commit();
        session.close();
        subscriber.close();
        conn.close();
        //初始化MessageListener
        ConsumerMq me = new ConsumerMq();

        //给消费者设定监听对象
//        consumer.setMessageListener(me);
    }

    public static void onMessage(Message message) {
        TextMessage txtMessage = (TextMessage)message;
        try {
        	System.out.println("get message " + txtMessage.getText());
            LOGGER.info ("get message " + txtMessage.getText());
        } catch (JMSException e) {
            LOGGER.ERROR("error {}", e);
        }
    }
}

可以在后台管理页面查看:

因为我们创建的是topic,所以点击topics,这个对应于生产者的一些信息

点击Subscribers 查看消费者的信息:

对于其他的参数,运行代码,可以观察到都是什么意思,有哪些变化

一只行走的小笨猿!

相关阅读

手把手教你如何玩转消息中间件(ActiveMQ)

情景引入 小白:起床起床起床起床。。。。快起床~ 我:怎么了又,大惊小怪,吓到我了。 小白:我有事有事想找你,十万火急呢~~ 我:你能有什么

help_topic

### Error updating database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException:SELECT command denied to u

ActiveMQ的作用总结(应用场景及优势)

业务场景说明:消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,队列的主要作用是消除高并发访问高峰,加快

分享到:

栏目导航

推荐阅读

热门阅读