返回顶部
首页 > 资讯 > 精选 >Java RabbitMQ如何实现持久化和发布确认
  • 274
分享到

Java RabbitMQ如何实现持久化和发布确认

2023-06-29 10:06:53 274人浏览 八月长安
摘要

这篇文章主要介绍Java RabbitMQ如何实现持久化和发布确认,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!1. 持久化当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下Rabbit

这篇文章主要介绍Java RabbitMQ如何实现持久化和发布确认,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!

    1. 持久化

    当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下RabbitMQ退出或者崩溃时,会忽视掉队列和消息。为了保证消息不丢失需要将队列和消息都标记为持久化。

    1.1 实现持久化

    队列持久化:在创建队列时将channel.queueDeclare();第二个参数改为true。

    消息持久化:在使用信道发送消息时channel.basicPublish();将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLaiN表示持久化消息。

    public class Producer3 {    private static final String LONG_QUEUE = "long_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 持久化队列        channel.queueDeclare(LONG_QUEUE,true,false,false,null);        Scanner scanner = new Scanner(System.in);        int i = 0;        while (scanner.hasNext()){            i++;            String msg = scanner.next() + i;            // 持久化消息            channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));            System.out.println("发送消息:'" + msg + "'成功");        }    }}

    但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。

    1.2 不公平分发

    轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。

    在消费者处修改channel.basicQos(1);表示开启不公平分发

    public class Consumer2 {    private static final String LONG_QUEUE = "long_queue";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        DeliverCallback deliverCallback = (consumerTag, message) -> {            // 模拟并发沉睡三十秒            try {                Thread.sleep(30000);                System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8));                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);            } catch (InterruptedException e) {                e.printStackTrace();            }        };        // 设置不公平分发        channel.basicQos(1);        channel.basicConsume(LONG_QUEUE,false,deliverCallback,                consumerTag -> {                    System.out.println(consumerTag + "消费者取消消费");                });    }}

    1.3 测试不公平分发

    测试目的:是否能实现能者多劳。

    测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。

    先启动生产者创建队列,再分别启动两个消费者。

    生产者按照顺序发四条消息:

    Java RabbitMQ如何实现持久化和发布确认

    睡眠时间短的线程A接收到了三条消息

    Java RabbitMQ如何实现持久化和发布确认

    而睡眠时间长的线程B只接收到的第二条消息:

    Java RabbitMQ如何实现持久化和发布确认

    因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。

    实验成功!

    1.4 预取值

    消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。

    这里的预期值就值得是上述方法channel.basicQos();里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。

    1.4.1 代码测试

    测试方法:

    新建两个不同的消费者分别给定预期值5个2。

    给睡眠时间长的指定为5,时间短的指定为2。

    假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。

    代码根据上述代码修改预期值即可。

    2. 发布确认

    发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。

    需要注意的是需要开启队列持久化才能使用确认发布。
    开启方法:channel.confirmSelect();

    2.1 单个确认发布

    是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。

    public class SoloProducer {    private static final int MESSAGE_COUNT = 100;    private static final String QUEUE_NAME = "confirm_solo";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 产生队列        channel.queueDeclare(QUEUE_NAME,true,false,false,null);        // 开启确认发布        channel.confirmSelect();        // 记录开始时间        long beginTime = System.currentTimeMillis();        for (int i = 0; i < MESSAGE_COUNT; i++) {            String msg = ""+i;            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));            // 单个发布确认            boolean flag = channel.waitForConfirms();            if (flag){                System.out.println("发送消息:" + i);            }        }        // 记录结束时间        long endTime = System.currentTimeMillis();        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");   }}

    2.2 批量确认发布

    一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。

    public class BatchProducer {    private static final int MESSAGE_COUNT = 100;    private static final String QUEUE_NAME = "confirm_batch";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 产生队列        channel.queueDeclare(QUEUE_NAME,true,false,false,null);        // 开启确认发布        channel.confirmSelect();        // 设置一个多少一批确认一次。        int batchSize = MESSAGE_COUNT / 10;        // 记录开始时间        long beginTime = System.currentTimeMillis();        for (int i = 0; i < MESSAGE_COUNT; i++) {            String msg = ""+i;            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));            // 批量发布确认            if (i % batchSize == 0){                if (channel.waitForConfirms()){                    System.out.println("发送消息:" + i);                }            }        }        // 记录结束时间        long endTime = System.currentTimeMillis();        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");    }}

    显然效率要比单个确认发布的高很多。

    2.3 异步确认发布

    编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。

    public class AsyncProducer {    private static final int MESSAGE_COUNT = 100;    private static final String QUEUE_NAME = "confirm_async";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 产生队列        channel.queueDeclare(QUEUE_NAME,true,false,false,null);        // 开启确认发布        channel.confirmSelect();        // 记录开始时间        long beginTime = System.currentTimeMillis();        // 确认成功回调        ConfirmCallback ackCallback = (deliveryTab,multiple) ->{            System.out.println("确认成功消息:" + deliveryTab);        };        // 确认失败回调        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{            System.out.println("未确认的消息:" + deliveryTab);        };        // 消息监听器                channel.addConfirmListener(ackCallback,nackCallback);        for (int i = 0; i < MESSAGE_COUNT; i++) {            String msg = "" + i;            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));        }        // 记录结束时间        long endTime = System.currentTimeMillis();        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");    }}

    2.4 处理未确认的消息

    最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。

    例如:ConcurrentLinkedQueue可以在确认队列confirm callbacks与发布线程之间进行消息的传递。

    处理方式:

    记录要发送的全部消息;

    在发布成功确认处删除;

    打印未确认的消息。

    使用一个哈希表存储消息,它的优点:

    可以将需要和消息进行关联;轻松批量删除条目;支持高并发

    ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
    public class AsyncProducerRemember {    private static final int MESSAGE_COUNT = 100;    private static final String QUEUE_NAME = "confirm_async_remember";    public static void main(String[] args) throws Exception {        Channel channel = RabbitMQUtils.getChannel();        // 产生队列        channel.queueDeclare(QUEUE_NAME,true,false,false,null);        // 开启确认发布        channel.confirmSelect();        // 线程安全有序的一个hash表,适用与高并发        ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>();        // 记录开始时间        long beginTime = System.currentTimeMillis();        // 确认成功回调        ConfirmCallback ackCallback = (deliveryTab, multiple) ->{            //2. 在发布成功确认处删除;            // 批量删除            if (multiple){                ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab);                confirmMap.clear();            }else {                // 单独删除                map.remove(deliveryTab);            }            System.out.println("确认成功消息:" + deliveryTab);        };        // 确认失败回调        ConfirmCallback nackCallback = (deliveryTab,multiple) ->{            // 3. 打印未确认的消息。            System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab);        };        // 消息监听器                channel.addConfirmListener(ackCallback,nackCallback);        for (int i = 0; i < MESSAGE_COUNT; i++) {            String msg = "" + i;            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8));            // 1. 记录要发送的全部消息;            map.put(channel.getNextPublishSeqNo(),msg);        }        // 记录结束时间        long endTime = System.currentTimeMillis();        System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒");    }}

    以上是“Java RabbitMQ如何实现持久化和发布确认”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注编程网精选频道!

    --结束END--

    本文标题: Java RabbitMQ如何实现持久化和发布确认

    本文链接: https://lsjlt.com/news/324233.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    猜你喜欢
    • Java RabbitMQ如何实现持久化和发布确认
      这篇文章主要介绍Java RabbitMQ如何实现持久化和发布确认,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!1. 持久化当RabbitMQ服务停掉以后消息生产者发送过的消息不丢失。默认情况下Rabbit...
      99+
      2023-06-29
    • Java RabbitMQ的持久化和发布确认详解
      目录1. 持久化1.1 实现持久化1.2 不公平分发1.3 测试不公平分发1.4 预取值1.4.1 代码测试2. 发布确认2.1 单个确认发布2.2 批量确认发布2.3 异步确认发布...
      99+
      2024-04-02
    • Redis如何正确关闭和开启持久化
      目录前言一、关闭持久化二、关闭失效问题三、开启持久化前言 版本 :Redis6.X 一、关闭持久化 Redis是默认开启RDB的,AOF则是默认关闭的。相当于初始安装的Redis是持久化的。 如何关闭redis持久化?我...
      99+
      2023-01-04
      Redis关闭和开启持久化 Redis关闭持久化 Redis开启持久化
    • 详解SpringBoot整合RabbitMQ如何实现消息确认
      目录简介生产者消息确认介绍流程配置ConfirmCallbackReturnCallback注册ConfirmCallback和ReturnCallback消费者消息确认介绍手动确认...
      99+
      2024-04-02
    • 如何使用redis实现持久化
      如何使用redis实现持久化?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。RDBRDB就是持久化的一种手段,把内存中数据在某些...
      99+
      2024-04-02
    • docker如何实现数据持久化
      这篇文章主要为大家展示了“docker如何实现数据持久化”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“docker如何实现数据持久化”这篇文章吧。 docke...
      99+
      2024-04-02
    • 如何在Redis中实现持久化
      如何在Redis中实现持久化?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。1、概述Redis 是内存数据库,如果不能将内存中的数据保存到磁盘中,那么一旦服务器进程退出,服务...
      99+
      2023-06-06
    • Golang与RabbitMQ实现消息持久化和数据安全的设计与实现
      要使用Golang和RabbitMQ实现消息持久化和数据安全,可以遵循以下设计和实现步骤:1. RabbitMQ持久化设置:- 在创...
      99+
      2023-10-08
      Golang
    • Golang与RabbitMQ实现消息持久化和数据安全的最佳实践
      使用Golang和RabbitMQ实现消息持久化和数据安全的最佳实践可以通过以下几个步骤来完成:1. 使用持久化连接:在连接Rabb...
      99+
      2023-10-20
      Golang
    • 如何使用Redis实现数据持久化
      如何使用Redis实现数据持久化引言Redis是一种快速、高效的内存数据库,但默认情况下它的数据是存储在内存中的。这就意味着一旦服务器断电或重启,Redis中的数据将会丢失。为了解决这个问题,Redis提供了一些机制来实现数据的持久化。本文...
      99+
      2023-11-07
      数据 redis 持久化
    • Golang与RabbitMQ实现消息持久化、数据安全和高可用的架构设计和实现
      要实现消息持久化、数据安全和高可用的架构设计和实现,可以使用Golang和RabbitMQ的组合。下面是一个简单的架构设计和实现示例...
      99+
      2023-10-20
      Golang
    • 如何在ApacheBeam中实现数据的持久化和恢复
      在Apache Beam中,可以使用不同的数据存储和处理引擎来实现数据的持久化和恢复。以下是一些常见的方式: 使用文件系统:可以将...
      99+
      2024-03-11
      Beam
    • Flask如何使用SQLAlchemy实现持久化数据
      这篇文章主要介绍“Flask如何使用SQLAlchemy实现持久化数据”,在日常操作中,相信很多人在Flask如何使用SQLAlchemy实现持久化数据问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Flask...
      99+
      2023-06-20
    • C#如何用RabbitMQ实现消息订阅与发布
      这篇文章给大家分享的是有关C#如何用RabbitMQ实现消息订阅与发布的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。Fanout交换机模型扇形交换机,采用广播模式,根据绑定的交换机,路由到与之对应的所有队列。一个...
      99+
      2023-06-15
    • python区块链持久化和命令行接口如何实现
      这篇“python区块链持久化和命令行接口如何实现”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“python区块链持久化和命...
      99+
      2023-06-30
    • ssm项目如何实现用户登陆持久化
      小编给大家分享一下ssm项目如何实现用户登陆持久化,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!用户登录持久化就是每次访问不用账号密码来校验身份,在用户登录第一次...
      99+
      2023-06-14
    • SpringBoot如何实现持久化登录状态获取
      目录SpringBoot 持久化登录状态获取1.编写登录的controller文件2.编写首页Controller逻辑3.运行测试,成功SpringBoot 实现登录登出,登录态管理...
      99+
      2024-04-02
    • java灰度发布如何实现
      Java灰度发布是指在发布新版本时,只将一部分用户引流到新版本,以验证新版本的稳定性和性能,最终再将全部用户引流到新版本。下面是几种...
      99+
      2023-10-27
      java
    • 如何利用Kubernetes实现容器的持久化存储
      本篇内容主要讲解“如何利用Kubernetes实现容器的持久化存储”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“如何利用Kubernetes实现容器的持久化存储”吧!  可以说,容器化彻底改变了...
      99+
      2023-06-04
    • 如何在Storm中实现数据的持久化存储
      在Storm中实现数据的持久化存储通常可以通过以下几种方法: 使用数据库:可以将Storm处理的数据存储到关系型数据库(如MySQ...
      99+
      2024-04-09
      Storm
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作