必威体育Betway必威体育官网
当前位置:首页 > IT技术

RabbitMQ入门到进阶之工作队列Work queues(Round-robin)

时间:2019-08-21 22:09:59来源:IT技术作者:seo实验室小编阅读:80次「手机版」
 

round-robin

大纲

此套免费课程视频,本人录播到了腾讯课堂

https://ke.qq.com/course/288116#tuin=5740604a

1.消息中间件概述,使用场景(日志处理,异步,系统解耦,流量削锋)

2.Rabbitmq3.7.2安装,控制台简介,管理员添加

3.用户vhost配置以及介绍

4.java操作简单队列,生产者发送消息到队列,消费者接收消息

5.简单队列的缺陷,工作队列work queues之 轮询分发(Round-robin),以及轮询分发现象

6.工作队列work queues 公平分发(fair dispatch);prefetchCount = 1来限制RabbitMQ发送的消息,手动应答ack。

7.消息应答ack与消息持久化durable

8.publish/subscribe发布订阅模式 交换机(转发器)匿名转发Nameless exchange, Fanout Exchange不处理路由键 , Direct Exchange处理路由键, topic Exchange将路由键和某模式进行匹配。队列绑定交换机(Exchange) ;

9.routing路由模式

10.topic主题模式

11.Rabbitmq之消息确认机制(AMQP事务机制)txSelect(), txCommit()以及txrollback(),事务机制的缺陷

12.Rabbitmq之消息确认机制(Confirm机制串行) waitforConfirms

13.Rabbitmq之消息确认机制(Confirm机制异步) ConfirmListener deliveryTag unconfirm集合维护

14.Spring集成rabbitmq-client,template 的使用

15.数据日志收集系统消息中间件应用

16.搜索系统DIH消息中间件应用

前言

那么之前介绍了简单队列simple 模型,我们应用程序在是使用消息系统的时候,一般生产者P生产消息是毫不费力的(发送消息即可),而消费者接收完消息后的需要处理,会耗费一定的时间,这时候,就有可能导致很多消息堆积在队列里面,一个消费者有可能不够用

那么怎么让消费者同事处理多个消息呢?

在同一个队列上创建多个消费者,让他们相互竞争,这样消费者就可以同时处理多条消息了

接下来就要介绍工作队列

工作队列

模型图

这里写图片描述

使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多消息,我们可以通过增加消费者来解决这一问题,使得系统的伸缩性更加容易。

轮询分发

生产者发送消息

public class Send {

    private final static String QUEUE_NAME  = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 50; i++) {
            // 消息内容
            String message = "." + i;
            channel.basicpublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");

            Thread.sleep(i * 10);
        }

        channel.close();
        connection.close();
    }
}

消费者1

package com.mmr.rabbitmq.work;

@SuppressWarnings("deprecation")
public class Recv1 {

    private final static String QUEUE_NAME  = "test_queue_wor1k";

    public static void main(String[] args) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();

         // 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //定义一个消息的消费者
        final consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [1] Received '" + message + "'");
                try {
                    doWork(message);
                } catch (Exception e) {
                    e.printstacktrace();
                } finally {
                    System.out.println(" [x] Done");
                }
            }

        };
        boolean autoAck = true; //消息的确认模式自动应答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }

    private static void doWork(String task) throws InterruptedException {
        Thread.sleep(1000);
    }

    @SuppressWarnings("unused")
    public static void oldAPi() throws Exception, TimeoutException {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成状态false  自动true 自动应答 不需要手动确认
        channel.basicConsume(QUEUE_NAME, true, consumer);
        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getbody());
            System.out.println(" [x] Received '" + message + "'");
        }
    }

}

消费者2

public class Recv2 {

    private final static String QUEUE_NAME  = "test_queue_work";

    public static void main(String[] args) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtils.getConnection();
        final Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //定义一个消息的消费者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [2] Received '" + message + "'");
                try {
                        Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    System.out.println(" [x] Done");
                }
            }

        };
        boolean autoAck = true; //
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    }
}

测试现象

消费者1 我们处理时间是1s ;而消费者2中处理时间是2s;

但是我们看到的现象并不是 1处理的多 消费者2处理的少,

[1] Received ‘.0’

[x] Done

[1] Received ‘.2’

[x] Done

[1] Received ‘.4’

[x] Done

[1] Received ‘.6’

……….

消费者1中将偶数部分处理掉了

[2] Received ‘.1’

[x] Done

[2] Received ‘.3’

[x] Done

[2] Received ‘.5’

[x] Done

…… .. . . .

消费者2中将基数部分处理掉了

我想要的是1处理的多,而2处理的少

测试结果:

1.消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取

2.消费者1和消费者2货到的消息数量是一样的 一个奇数一个偶数

按道理消费者1 获取的比消费者2要多

这种方式叫做轮询分发 结果就是不管谁忙或清闲,都不会给谁多一个任务或少一个任务,任务总是你一个我一个的分

相关阅读

GNN新作《Relational inductive biases,deep learning,a

当下AI的瓶颈: 静观现有AI,尤其是Deep Learning的发展如火如荼,几大热炒的明星模型无非MLP,CNN,和RNN。这些模型更倾向于是对现实世界

【巭牛猫】老主机系统虚拟机安装苹果系统 Q8300 / VMw

App Store官网下载 macOS Mojave 并打包光盘镜像文件。安装 VMware Workstation Player官网下载 VMware Worksta

Axialis IconWorkshop 汉化破解版

【官方介绍】Axialis IconWorkShop的简称是一款强大专业的图标制作软件和转换工具,可创建、提取、管理和再分配所有Windows、Macin

Android O Launcher3-Workspace加载

一.简述: Launcher这里我们研究主要是Launcher3(Android O平台),各个手机公司自家的ROM Launcher,咱们也看不到,但是八九不离十啦,他

Adjoin the Networks Gym - 100781A

http://codeforces.com/gym/100781/attachments 给出一个森林 问将图连通后最小的树直径 对于所有连通块都求出直径 若只有一个连

分享到:

栏目导航

推荐阅读

热门阅读