round-robin
大纲
https://ke.qq.com/course/288116#tuin=5740604a
1.消息中间件概述,使用场景(日志处理,异步,系统解耦,流量削锋)
2.Rabbitmq3.7.2安装,控制台简介,管理员添加
3.用户vhost配置以及介绍
4.java操作简单队列,生产者发送消息到队列,消费者接收消息
5.简单队列的缺陷,工作队列work queues之 轮询分发(Round-robin),以及轮询分发现象
6.工作队列work queues 公平分发(fair dispatch);prefetchCount = 1来限制RabbitMQ发送的消息,手动应答ack。
7.消息应答ack与消息持久化durable
8.publish/subscribe发布订阅模式 交换机(转发器)匿名转发Nameless exchange, Fanout Exchange不处理路由键 , Direct Exchange处理路由键, topic Exchange将路由键和某模式进行匹配。队列绑定交换机(Exchange) ;
9.routing路由模式
10.topic主题模式
11.Rabbitmq之消息确认机制(AMQP事务机制)txSelect(), txCommit()以及txrollback(),事务机制的缺陷
12.Rabbitmq之消息确认机制(Confirm机制串行) waitforConfirms
13.Rabbitmq之消息确认机制(Confirm机制异步) ConfirmListener deliveryTag unconfirm集合维护
14.Spring集成rabbitmq-client,template 的使用
16.搜索系统DIH消息中间件应用
前言
那么之前介绍了简单队列simple 模型,我们应用程序在是使用消息系统的时候,一般生产者P生产消息是毫不费力的(发送消息即可),而消费者接收完消息后的需要处理,会耗费一定的时间,这时候,就有可能导致很多消息堆积在队列里面,一个消费者有可能不够用
那么怎么让消费者同事处理多个消息呢?
在同一个队列上创建多个消费者,让他们相互竞争,这样消费者就可以同时处理多条消息了
接下来就要介绍工作队列
工作队列
模型图
使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多消息,我们可以通过增加消费者来解决这一问题,使得系统的伸缩性更加容易。
轮询分发
生产者发送消息
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50; i++) {
// 消息内容
String message = "." + i;
channel.basicpublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
消费者1
package com.mmr.rabbitmq.work;
@SuppressWarnings("deprecation")
public class Recv1 {
private final static String QUEUE_NAME = "test_queue_wor1k";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义一个消息的消费者
final consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [1] Received '" + message + "'");
try {
doWork(message);
} catch (Exception e) {
e.printstacktrace();
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; //消息的确认模式自动应答
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
private static void doWork(String task) throws InterruptedException {
Thread.sleep(1000);
}
@SuppressWarnings("unused")
public static void oldAPi() throws Exception, TimeoutException {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成状态false 自动true 自动应答 不需要手动确认
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getbody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
消费者2
public class Recv2 {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] args) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//定义一个消息的消费者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [2] Received '" + message + "'");
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; //
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
测试现象
消费者1 我们处理时间是1s ;而消费者2中处理时间是2s;
但是我们看到的现象并不是 1处理的多 消费者2处理的少,
[1] Received ‘.0’
[x] Done
[1] Received ‘.2’
[x] Done
[1] Received ‘.4’
[x] Done
[1] Received ‘.6’
……….
消费者1中将偶数部分处理掉了
[2] Received ‘.1’
[x] Done
[2] Received ‘.3’
[x] Done
[2] Received ‘.5’
[x] Done
…… .. . . .
消费者2中将基数部分处理掉了
我想要的是1处理的多,而2处理的少
测试结果:
1.消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取
2.消费者1和消费者2货到的消息数量是一样的 一个奇数一个偶数
按道理消费者1 获取的比消费者2要多
这种方式叫做轮询分发 结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个的分
相关阅读
GNN新作《Relational inductive biases,deep learning,a
当下AI的瓶颈: 静观现有AI,尤其是Deep Learning的发展如火如荼,几大热炒的明星模型无非MLP,CNN,和RNN。这些模型更倾向于是对现实世界
【巭牛猫】老主机系统虚拟机安装苹果系统 Q8300 / VMw
App Store官网下载 macOS Mojave 并打包光盘镜像文件。安装 VMware Workstation Player官网下载 VMware Worksta
【官方介绍】Axialis IconWorkShop的简称是一款强大专业的图标制作软件和转换工具,可创建、提取、管理和再分配所有Windows、Macin
Android O Launcher3-Workspace加载
一.简述: Launcher这里我们研究主要是Launcher3(Android O平台),各个手机公司自家的ROM Launcher,咱们也看不到,但是八九不离十啦,他
Adjoin the Networks Gym - 100781A
http://codeforces.com/gym/100781/attachments 给出一个森林 问将图连通后最小的树直径 对于所有连通块都求出直径 若只有一个连