topic
这个topic就类似与微信公众号的感觉,一个生产者可以对应多个消费者
代码在运行的时候需要先运行一下消费者,相当于你订阅了这个topic(类似于关注了一个微信公众号)
,不然接收不到消息
生产者:
package com.test.mq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.messageproducer;
import javax.jms.session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class produceMq {
private static final String subject = "test-activemq-queue";
//连接账号
private static String userName = "admin";
//连接密码
private static String password = "admin";
//连接地址
private static String brokerURL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
//初始化连接工厂
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(userName,password,brokerURL);
//获得连接
Connection conn = connectionFactory.createConnection();
//创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建队列
Destination dest = session.createTopic("MyTopic");
//通过session可以创建消息的生产者
MessageProducer producer = session.createProducer(dest);
producer.setDeliveryMode(DeliveryMode.persistENT);//持久化
//启动连接 ,连接的启动位置和queue略有不同,需要把配置配完,在启动连接
conn.start();
for (int i=0;i<3;i++) {
//初始化一个mq消息
TextMessage message = session.createTextMessage("topic 持久化66" + i);
//发送消息
producer.send(message);//
}
session.commit();
session.close();
//关闭mq连接
conn.close();
}
}
消费者:
package com.test.mq.topic;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Messageconsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.Topicsubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerMq {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerMq.class);
private static final String SUBJECT = "test-activemq-queue";
//连接账号
private static String userName = "admin";
//连接密码
private static String password = "admin";
//连接地址
private static String brokerURL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
//初始化ConnectionFactory
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(userName,password,brokerURL);
//创建mq连接
Connection conn = connectionFactory.createConnection();
//参数名称随便起
conn.setclientid("c3");
//创建会话
Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
//通过会话创建目标
Topic topic = session.createTopic("MyTopic");
//第二个参数随便起
TopicSubscriber subscriber = session.createDurableSubscriber(topic, "t3");//持久
化消息接收
//启动连接
conn.start();
//创建mq消息的消费者
Message msg = subscriber.receive();
while(msg!=null){
onMessage(msg);
msg=subscriber.receive(1000L);
// session.commit();
// message.acknowledge();
}
session.commit();
session.close();
subscriber.close();
conn.close();
//初始化MessageListener
ConsumerMq me = new ConsumerMq();
//给消费者设定监听对象
// consumer.setMessageListener(me);
}
public static void onMessage(Message message) {
TextMessage txtMessage = (TextMessage)message;
try {
System.out.println("get message " + txtMessage.getText());
LOGGER.info ("get message " + txtMessage.getText());
} catch (JMSException e) {
LOGGER.ERROR("error {}", e);
}
}
}
可以在后台管理页面查看:
因为我们创建的是topic,所以点击topics,这个对应于生产者的一些信息
点击Subscribers 查看消费者的信息:
对于其他的参数,运行代码,可以观察到都是什么意思,有哪些变化
一只行走的小笨猿!
相关阅读
情景引入 小白:起床起床起床起床。。。。快起床~ 我:怎么了又,大惊小怪,吓到我了。 小白:我有事有事想找你,十万火急呢~~ 我:你能有什么
### Error updating database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException:SELECT command denied to u
业务场景说明:消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有着深入的应用,队列的主要作用是消除高并发访问高峰,加快