返回顶部
首页 > 资讯 > 后端开发 > Python >Java RabbitMQ的持久化和发布确认详解
  • 237
分享到

Java RabbitMQ的持久化和发布确认详解

2024-04-02 19:04:59 237人浏览 薄情痞子

Python 官方文档:入门教程 => 点击学习

摘要

目录1. 持久化1.1 实现持久化1.2 不公平分发1.3 测试不公平分发1.4 预取值1.4.1 代码测试2. 发布确认2.1 单个确认发布2.2 批量确认发布2.3 异步确认发布

1. 持久化

RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息。为了保证消息不丢失需要将队列和消息都标记为持久化。

1.1 实现持久化

1.队列持久化:在创建队列时将channel.queueDeclare();第二个参数改为true。

2.消息持久化:在使用信道发送消息时channel.basicPublish();将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLaiN表示持久化消息。


public class Producer3 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 持久化队列
        channel.queueDeclare(LONG_QUEUE,true,false,false,null);
        Scanner scanner = new Scanner(System.in);
        int i = 0;
        while (scanner.hasNext()){
            i++;
            String msg = scanner.next() + i;
            // 持久化消息
            channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息:'" + msg + "'成功");
        }
    }
}

但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。

1.2 不公平分发

轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。

在消费者处修改channel.basicQos(1);表示开启不公平分发


public class Consumer2 {
    private static final String LONG_QUEUE = "long_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            // 模拟并发沉睡三十秒
            try {
                Thread.sleep(30000);
                System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        // 设置不公平分发
        channel.basicQos(1);
        channel.basicConsume(LONG_QUEUE,false,deliverCallback,
                consumerTag -> {
                    System.out.println(consumerTag + "消费者取消消费");
                });
    }
}

1.3 测试不公平分发

测试目的:是否能实现能者多劳。

测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。

先启动生产者创建队列,再分别启动两个消费者。

生产者按照顺序发四条消息:

在这里插入图片描述

睡眠时间短的线程A接收到了三条消息

在这里插入图片描述

而睡眠时间长的线程B只接收到的第二条消息:

在这里插入图片描述

因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。

实验成功!

1.4 预取值

消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。

这里的预期值就值得是上述方法channel.basicQos();里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。

1.4.1 代码测试

测试方法:

1.新建两个不同的消费者分别给定预期值5个2。

2.给睡眠时间长的指定为5,时间短的指定为2。

3.假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。

代码根据上述代码修改预期值即可。

2. 发布确认

发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。

需要注意的是需要开启队列持久化才能使用确认发布。
开启方法:channel.confirmSelect();

2.1 单个确认发布

是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。


public class SoloProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_solo";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 单个发布确认
            boolean flag = channel.waitForConfirms();
            if (flag){
                System.out.println("发送消息:" + i);
            }
        }
        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }
}

2.2 批量确认发布

一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。


public class BatchProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_batch";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 设置一个多少一批确认一次。
        int batchSize = MESSAGE_COUNT / 10;
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = ""+i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 批量发布确认
            if (i % batchSize == 0){
                if (channel.waitForConfirms()){
                    System.out.println("发送消息:" + i);
                }
            }
        }
        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

显然效率要比单个确认发布的高很多。

2.3 异步确认发布

编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。


public class AsyncProducer {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        // 确认成功回调
        ConfirmCallback ackCallback = (deliveryTab,multiple) ->{
            System.out.println("确认成功消息:" + deliveryTab);
        };
        // 确认失败回调
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            System.out.println("未确认的消息:" + deliveryTab);
        };
        // 消息监听器
        
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
        }

        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

2.4 处理未确认的消息

最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。

例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks与发布线程之间进行消息的传递。

处理方式:

1.记录要发送的全部消息;

2.在发布成功确认处删除;

3.打印未确认的消息。

使用一个哈希表存储消息,它的优点:

可以将需要和消息进行关联;轻松批量删除条目;支持高并发

ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();

public class AsyncProducerRemember {
    private static final int MESSAGE_COUNT = 100;
    private static final String QUEUE_NAME = "confirm_async_remember";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtils.getChannel();
        // 产生队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        // 开启确认发布
        channel.confirmSelect();
        // 线程安全有序的一个hash表,适用与高并发
        ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();
        // 记录开始时间
        long beginTime = System.currentTimeMillis();
        // 确认成功回调
        ConfirmCallback ackCallback = (deliveryTab, multiple) ->{
            //2. 在发布成功确认处删除;
            // 批量删除
            if (multiple){
                ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);
                confirmMap.clear();
            }else {
                // 单独删除
                map.remove(deliveryTab);
            }
            System.out.println("确认成功消息:" + deliveryTab);
        };
        // 确认失败回调
        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{
            // 3. 打印未确认的消息。
            System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);
        };
        // 消息监听器
        
        channel.addConfirmListener(ackCallback,nackCallback);
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = "" + i;
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));
            // 1. 记录要发送的全部消息;
            map.put(channel.getNextPublishSeqNo(),msg);
        }

        // 记录结束时间
        long endTime = System.currentTimeMillis();
        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");
    }
}

总结

显然来说,异步处理除了在编码处有些麻烦,在处理时间效率和可用性上都是比单处理和批处理好很多。

本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注编程网的更多内容!    

--结束END--

本文标题: Java RabbitMQ的持久化和发布确认详解

本文链接: https://lsjlt.com/news/141746.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • Java RabbitMQ的持久化和发布确认详解
    目录1. 持久化1.1 实现持久化1.2 不公平分发1.3 测试不公平分发1.4 预取值1.4.1 代码测试2. 发布确认2.1 单个确认发布2.2 批量确认发布2.3 异步确认发布...
    99+
    2024-04-02
  • Java RabbitMQ如何实现持久化和发布确认
    这篇文章主要介绍Java RabbitMQ如何实现持久化和发布确认,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!1. 持久化当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下Rabbit...
    99+
    2023-06-29
  • RabbitMQ发布确认高级问题的示例分析
    RabbitMQ发布确认高级问题的示例分析,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。1、发布确认高级1. 存在的问题再生产环境中由于一些不明原因导致rabb...
    99+
    2023-06-22
  • RabbitMQ队列中间件消息持久化 确认机制 死信队列原理
    目录持久化和应答机制Ack消息持久化应答机制Ack死信队列延时队列集群模式持久化和应答机制Ack 消息队列中间件系列的最后一篇了,RabbitMQ消息的持久化、确认机制、死信队列、负...
    99+
    2023-05-19
    RabbitMQ消息队列 RabbitMQ消息持久化确认机制 RabbitMQ死信队列
  • Redis的持久化详解
    目录一、Redis的持久化二、RDB(Redis DataBase)1、RDB快照原理2、RDB配置3、redis.conf 其他一些配置4、RDB的备份恢复5、RDB优缺点三、AOF(Append Of File)1、...
    99+
    2023-06-05
    Redis持久化详解 Redis 持久化
  • 一文详解Redis中的持久化
    目录1. 前言2. RDB2.1 手动触发2.2 自动触发3. bgsave大致流程4. RDB持久化方式的优缺点5. AOF6. AOF的使用方式7. AOF流程剖析7.1 命令写入7.2 文件同步7.3 重写机制7....
    99+
    2024-04-02
  • Redis的持久化和主从复制详细讲解
    本篇内容介绍了“Redis的持久化和主从复制详细讲解”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!什么是R...
    99+
    2024-04-02
  • Redis两种持久化方案RDB和AOF详解
    本文主要针对Redis 有两种持久化方案RDB和AOF做了详细的分析,希望我们整理的内容能够帮助大家对这个两种方案有更加深入的理解。 Redis 有两种持久化方案,RDB (Redis DataBase)和...
    99+
    2022-06-04
    两种 详解 持久
  • Springboot 2.x RabbitTemplate默认消息持久化的原因解析
    目录前言springboot测试测试现象源码分析联想前言 之前在Java直接测试mq消息持久化时,采取如下的配置实现消息的持久化: //消息持久化测试 Builder builder...
    99+
    2024-04-02
  • Redis发布订阅演示、事务演示、持久化的方法
    小编给大家分享一下Redis发布订阅演示、事务演示、持久化的方法,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!文章目录一、Red...
    99+
    2024-04-02
  • Golang与RabbitMQ实现消息持久化和数据安全的最佳实践
    使用Golang和RabbitMQ实现消息持久化和数据安全的最佳实践可以通过以下几个步骤来完成:1. 使用持久化连接:在连接Rabb...
    99+
    2023-10-20
    Golang
  • 详解PHP中的数据库连接持久化
    目录PHP中的数据库连接持久化什么是数据库连接持久化连接持久化有什么用?效率对比注意总结PHP中的数据库连接持久化 数据库的优化是我们做web开发的重中之重,甚至很多情况下其实我们...
    99+
    2024-04-02
  • Ruby序列化和持久化存储(Marshal、Pstore)操作方法详解
    Ruby Marshal序列化 Marshal是Ruby的核心库,可以将一些对象以二进制的方式序列化保存到文件中,需要时再从文件中加载重新构建成对象,即反序列化。 Marshal对数...
    99+
    2024-04-02
  • Golang与RabbitMQ实现消息持久化和数据安全的设计与实现
    要使用Golang和RabbitMQ实现消息持久化和数据安全,可以遵循以下设计和实现步骤:1. RabbitMQ持久化设置:- 在创...
    99+
    2023-10-08
    Golang
  • MySQL8新特性:自增主键的持久化详解
    前言 自增主键没有持久化是个比较早的bug,这点从其在官方bug网站的id号也可看出(https://bugs.mysql.com/bug.php?id=199)。由Peter Zaitsev(现P...
    99+
    2024-04-02
  • SpringBoot日志的打印与持久化详细解析
    目录1. 日志有什么用2. 日志怎么用3. Spring Boot 自定义日志的打印3.1 先获取到打印日志对象3.2 使用日志对象打印日志3.3 日志格式说明4. 日志级别5. 日...
    99+
    2024-04-02
  • 详解Android开发数据持久化之文件存储(附源码)
    其实我们在社交网络上面所发出的任何信息, 都希望能够保留下来. 那么如何实现呢? 数据持久化 数据持久化, 就是将内存中的瞬时数据保存在存储设备中, 保证即便关机之后...
    99+
    2022-06-06
    数据 存储 持久化 android开发 数据持久化 源码 Android
  • 如何解决gearman队列持久化引发的问题
    这篇文章主要讲解了“如何解决gearman队列持久化引发的问题”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何解决gearman队列持久化引发的问题”吧!具体分析如下:一、gearman ...
    99+
    2023-06-09
  • 怎么解析spark的宽窄依赖和持久化
    本篇文章为大家展示了怎么解析spark的宽窄依赖和持久化,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。一.持久化官网1.官网位置截图2.cache 源码cache底层调用的是persisit&nbs...
    99+
    2023-06-02
  • gearman队列持久化引发的问题及解决方法
    本文简述了gearman用mysql持久化的方法,以及由此引发的一些问题,具体分析如下: 一、gearman 创建Mysql持久化队列的方式如下: 1. 登入mysql命令行,运行: create...
    99+
    2022-06-04
    队列 解决方法 持久
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作