返回顶部
首页 > 资讯 > 精选 >怎么使用springboot + rabbitmq消息确认机制
  • 617
分享到

怎么使用springboot + rabbitmq消息确认机制

2023-06-02 09:06:16 617人浏览 八月长安
摘要

这篇文章主要介绍“怎么使用SpringBoot + RabbitMQ消息确认机制”,在日常操作中,相信很多人在怎么使用springboot + rabbitMQ消息确认机制问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对

这篇文章主要介绍“怎么使用SpringBoot + RabbitMQ消息确认机制”,在日常操作中,相信很多人在怎么使用springboot + rabbitMQ消息确认机制问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么使用springboot + rabbitmq消息确认机制”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

一、准备环境
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置中需要开启 发送端消费端 的消息确认。


spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.passWord=guest# 发送者开启 confirm 确认机制spring.rabbitmq.publisher-confirms=true# 发送者开启 return 确认机制spring.rabbitmq.publisher-returns=true##################################################### 设置消费端手动 ackspring.rabbitmq.listener.simple.acknowledge-mode=manual# 是否支持重试spring.rabbitmq.listener.simple.retry.enabled=true

定义交换机 confirmTestExchange 和队列 confirm_test_queue ,并将队列绑定在交换机上。

@Configuration
public class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue(
            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }
}

rabbitmq 的消息确认分为两部分:发送消息确认 和 消息接收确认。

怎么使用springboot + rabbitmq消息确认机制在这里插入图片描述

二、消息发送确认

消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}

实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationDataackcause

  • correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
  • ack:消息投递到broker 的状态,true表示成功。
  • cause:表示投递失败的原因。

但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback

如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

下边是具体的消息发送,在rabbitTemplate中设置 ConfirmReturn 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id10000000000

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Autowired
    private ReturnCallbackService returnCallbackService;

    public void sendMessage(String exchange, String routingKey, Object msg) {

        
        rabbitTemplate.setMandatory(true);

        
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        
        rabbitTemplate.setReturnCallback(returnCallbackService);

        
        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(UUID.randomUUID().toString()));
    }

三、消息接收确认@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
    
    @RabbitHandler
    public void processhandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("小富收到消息:{}", msg);

            //TODO 具体业务
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {
            
            if (message.getMessageProperties().getRedelivered()) {
                
                log.error("消息已重复处理失败,拒绝再次接收...");
                
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                
                log.error("消息即将再次返回队列处理...");
                
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
            }
        }
    }
}

消费消息有三种回执方法,我们来分析一下每种方法的含义。

basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple) 

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行acknackreject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列。

basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列。

四、测试五、踩坑日志

这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。怎么使用springboot + rabbitmq消息确认机制

在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("消费者 2 号收到:{}", msg);

            int a = 1 / 0;

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

但是有个问题是,业务代码一旦出现 bug 99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。

怎么使用springboot + rabbitmq消息确认机制在这里插入图片描述

本地的CPU被瞬间打满了,大家可以想象一下当时在生产环境导致服务死机,我是有多慌。

怎么使用springboot + rabbitmq消息确认机制而且rabbitmq management 只有一条未被确认的消息。

怎么使用springboot + rabbitmq消息确认机制在这里插入图片描述

经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。

消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。

而我们当时的解决方案是,先将消息进行应答,此时消息队列会删除该条消息,同时我们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLaiN,
                    JSON.tojsONBytes(msg));

但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入MySQL并推送报警,进行人工处理和定时任务做补偿。

如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助Mysql、或者redis 将消息持久化,通过再消息中的唯一性属性校验。

到此,关于“怎么使用springboot + rabbitmq消息确认机制”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注编程网网站,小编会继续努力为大家带来更多实用的文章!

--结束END--

本文标题: 怎么使用springboot + rabbitmq消息确认机制

本文链接: https://lsjlt.com/news/229577.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 怎么使用springboot + rabbitmq消息确认机制
    这篇文章主要介绍“怎么使用springboot + rabbitmq消息确认机制”,在日常操作中,相信很多人在怎么使用springboot + rabbitmq消息确认机制问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对...
    99+
    2023-06-02
  • SpringBoot整合RabbitMQ实现消息确认机制
    前面几篇案例已经将常用的交换器(DirectExchange、TopicExchange、FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认。 ...
    99+
    2024-04-02
  • rabbitmq消息确认机制是什么
    RabbitMQ消息确认机制是一种用于保证消息可靠传输的机制。它确保生产者发送的消息被正确地传递给消费者并被消费者成功处理。在Rab...
    99+
    2023-10-09
    rabbitmq
  • 详解SpringBoot整合RabbitMQ如何实现消息确认
    目录简介生产者消息确认介绍流程配置ConfirmCallbackReturnCallback注册ConfirmCallback和ReturnCallback消费者消息确认介绍手动确认...
    99+
    2024-04-02
  • RabbitMQ队列中间件消息持久化 确认机制 死信队列原理
    目录持久化和应答机制Ack消息持久化应答机制Ack死信队列延时队列集群模式持久化和应答机制Ack 消息队列中间件系列的最后一篇了,RabbitMQ消息的持久化、确认机制、死信队列、负...
    99+
    2023-05-19
    RabbitMQ消息队列 RabbitMQ消息持久化确认机制 RabbitMQ死信队列
  • springboot中rabbitmq实现消息可靠性机制详解
    1. 生产者模块通过publisher confirm机制实现消息可靠性  1.1 生产者模块导入rabbitmq相关依赖 <!--AMQP依赖,包含Rabbit...
    99+
    2024-04-02
  • rabbitMQ怎么复制队列内消息
    要复制RabbitMQ队列内的消息,可以使用RabbitMQ的镜像队列功能。镜像队列功能可以将一个队列中的消息复制到多个节点上,以提...
    99+
    2024-02-29
    rabbitmq
  • Golang中使用RabbitMQ实现消息确认和保证可靠性的技巧
    在Golang中使用RabbitMQ实现消息确认和保证可靠性的技巧包括以下几个方面:1. 使用消息确认机制:在RabbitMQ中,可...
    99+
    2023-10-20
    Golang
  • 怎么用SpringBoot+RabbitMQ实现消息可靠传输
    这篇文章主要介绍了怎么用SpringBoot+RabbitMQ实现消息可靠传输的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇怎么用SpringBoot+RabbitMQ实现消息可靠传输文章都会有所收获,下面我们...
    99+
    2023-06-30
  • Golang中使用RabbitMQ实现消息确认和保证可靠性的最佳实践
    在Golang中使用RabbitMQ实现消息确认和保证可靠性的最佳实践包括以下步骤:1. 引入依赖包:使用`go get`命令安装R...
    99+
    2023-10-08
    Golang
  • 队列在PHP与MySQL中的消息确认机制和消息重试的处理方法
    引言:随着互联网应用的发展,很多在线服务需要处理大量的请求,而这些请求往往需要一个异步的处理方式。队列是一种常见的解决方案,可以有效地将请求与处理解耦,提高系统的性能和可靠性。本文将介绍队列在PHP与MySQL中的消息确认机制和消息重试的处...
    99+
    2023-10-21
    队列 消息确认 消息重试
  • RabbitMQ消息转换器怎么应用
    本文小编为大家详细介绍“RabbitMQ消息转换器怎么应用”,内容详细,步骤清晰,细节处理妥当,希望这篇“RabbitMQ消息转换器怎么应用”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。在SpringAMQP的发...
    99+
    2023-07-05
  • Golang中使用RabbitMQ实现消息确认和保证可靠性的技巧和最佳实践
    在Golang中使用RabbitMQ实现消息确认和保证可靠性的技巧和最佳实践如下:1. 使用事务:在Golang中,RabbitMQ...
    99+
    2023-10-20
    Golang
  • RabbitMQ是怎么确定消息是否投递到队列中的
    本篇内容介绍了“RabbitMQ是怎么确定消息是否投递到队列中的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所...
    99+
    2024-04-02
  • Android消息机制Handler如何使用
    这篇文章主要介绍“Android消息机制Handler如何使用”,在日常操作中,相信很多人在Android消息机制Handler如何使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Android消息机制Ha...
    99+
    2023-06-21
  • Windows消息过滤机制怎么应用
    Windows消息过滤机制是指Windows操作系统对各种窗口消息进行过滤和处理的机制,可以用于实现各种功能和优化程序性能。下面是一...
    99+
    2024-02-29
    Windows
  • MQ消息队列中间件RabbitMQ怎么用
    小编给大家分享一下MQ消息队列中间件RabbitMQ怎么用,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!MQ消息队列中间件—RabbitMQ消息中间件主要用于组件...
    99+
    2023-06-04
  • SpringBoot怎么整合RabbitMq自定义消息监听容器来实现消息批量处理
    这篇文章主要介绍“SpringBoot怎么整合RabbitMq自定义消息监听容器来实现消息批量处理”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“SpringBoot怎么整合RabbitMq自定义消息...
    99+
    2023-07-05
  • SpringBoot怎么使用WebSocket实现群发消息
    这篇文章主要介绍了SpringBoot怎么使用WebSocket实现群发消息的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇SpringBoot怎么使用WebSocket实现群发消息文章都会有所收获,下面我们一起...
    99+
    2023-06-08
  • 怎么使用django+celery+RabbitMQ自定义多个消息队列
    本篇内容主要讲解“怎么使用django+celery+RabbitMQ自定义多个消息队列”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么使用django+celery+RabbitMQ自定义多...
    99+
    2023-07-05
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作