返回顶部
首页 > 资讯 > 精选 >.Net Core和RabbitMQ怎么限制循环消费
  • 253
分享到

.Net Core和RabbitMQ怎么限制循环消费

2023-07-04 12:07:24 253人浏览 八月长安
摘要

这篇文章主要介绍了.net Core和RabbitMQ怎么限制循环消费的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇.Net Core和RabbitMQ怎么限制循环消费文章都会有所收获,下面

这篇文章主要介绍了.net Core和RabbitMQ怎么限制循环消费的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇.Net Core和RabbitMQ怎么限制循环消费文章都会有所收获,下面我们一起来看看吧。

    前言

    当消费者端接收消息处理业务时,如果出现异常或是拒收消息将消息又变更为等待投递再次推送给消费者,这样一来,则形成循环的条件。

    .Net Core和RabbitMQ怎么限制循环消费

    循环场景

    生产者发送100条消息到RabbitMQ中,消费者设定读取到第50条消息时,设置拒收,同时设定是否还留存在当前队列中(当requeue为false时,设置了死信队列则进入死信队列,否则移除消息)。

    consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine("拒收");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};

    当第50条消息拒收,则仍在队列中且处在队列头部,重新推送给消费者,再次拒收,再次推送,反反复复。

    .Net Core和RabbitMQ怎么限制循环消费

    最终其他消息全部消费完毕,仅剩第50条消息往复间不断消费,拒收,消费,这将可能导致RabbitMQ出现内存泄漏问题。

    .Net Core和RabbitMQ怎么限制循环消费

    解决方案

    RabbitMQ及AMQP协议本身没有提供这类重试功能,但可以利用一些已有的功能来间接实现重试限定(以下只考虑基于手动确认模式情况)。此处只想到或是只查到了如下几种方案解决消息循环消费问题。

    • 一次消费

      • 无论成功与否,消费者都对外返回ack,将拒收原因或是异常信息catch存入本地或是新队列中另作重试。

      • 消费者拒绝消息或是出现异常,返回Nack或Reject,消息进入死信队列或丢弃(requeue设定为false)。

    • 限定重试次数

      • 在消息的头中添加重试次数,并将消息重新发送出去,再每次重新消费时从头中判断重试次数,递增或递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。

      • 可以在Redis、Memcache或其他存储中存储消息唯一键(例如Guid、雪花Id等,但必须在发布消息时手动设置它),甚至在mysql中连同重试次数一起存储,然后在每次重新消费时递增/递减该值,直到达到限制,requeue改为false,最终进入死信队列或丢弃。

      • 队列使用Quorum类型,限制投递次数,超过次数消息被删除。

    • 队列消息过期

      • 设置过期时间,给队列或是消息设置TTL,重试一定次数消息达到过期时间后进入死信队列或丢弃(requeue设定为true)。

    • 也许还有更多好的方案...

    一次消费

    对外总是Ack

    消息到达了消费端,可因某些原因消费失败了,对外可以发送Ack,而在内部走额外的方式去执行补偿操作,比如将消息转发到内部的RabbitMQ或是其他处理方式,终归是只消费一次。

    var queueName = "alwaysack_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    try    {        var message = ea.Body;        Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));        if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))        {            throw new Exception("模拟异常");        }    }    catch (Exception ex)    {        Console.WriteLine(ex.Message);    }    finally    {        ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);    }};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    当消费端收到消息,处理时出现异常,可以另想办法去处理,而对外保持着ack的返回,以避免消息的循环消费。

    .Net Core和RabbitMQ怎么限制循环消费

    消息不重入队列

    在消费者端,因异常或是拒收消息时,对requeue设置为false时,如果设置了死信队列,则符合“消息被拒绝且不重入队列”这一进入死信队列的情况,从而避免消息反复重试。如未设置死信队列,则消息被丢失。

    .Net Core和RabbitMQ怎么限制循环消费

    此处假定接收100条消息,在接收到第50条消息时设置拒收,并且设置了requeue为false。

    var dlxExchangeName = "dlx_exchange";channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);var dlxQueueName = "dlx_queue";channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");var queueName = "nackorreject_queue";var arguments = new Dictionary<string, object>{    { "x-dead-letter-exchange", dlxExchangeName }};channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine("拒收");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);//关键在于requeue=false        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    如此一来,拒收消息不会重入队列,并且现有队列绑定了死信交换机,因此,消息进入到死信队列中,如不绑定,则消息丢失。

    .Net Core和RabbitMQ怎么限制循环消费

    限定重试次数

    设置重试次数,限定循环消费的次数,允许短暂的循环,但最终打破循环。

    消息头设定次数

    在消息头中设置次数记录作为标记,但是,消费端无法对接收到的消息修改消息头然后将原消息送回MQ,因此,需要将原消息内容重新发送消息到MQ,具体步骤如下

    • 原消息设置不重入队列。

    • 再发送新的消息其内容与原消息一致,可设置新消息的消息头来携带重试次数。

    • 消费端再次消费时,便可从消息头中查看消息被消费的次数。

    .Net Core和RabbitMQ怎么限制循环消费

    此处假定接收10条消息,在接收到第5条消息时设置拒收, 当消息头中重试次数未超过设定的3次时,消息可以重入队列,再次被消费。

    var queueName = "messageheaderretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("5"))    {        var maxRetryCount = 3;        Console.WriteLine($"拒收 {DateTime.Now}");        //初次消费        if (ea.BasicProperties.Headers == null)        {            //原消息设置为不重入队列            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);            //发送新消息到队列中            RetryPublishMessage(channel, queueName, message.ToArray(), 1);            return;        }        //获取重试次数        var retryCount = ParseRetryCount(ea);        if (retryCount < maxRetryCount)        {            //原消息设置为不重入队列            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);            //发送新消息到队列中            RetryPublishMessage(channel, queueName, message.ToArray(), retryCount + 1);            return;        }        //到达最大次数,不再重试消息        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);static void RetryPublishMessage(IModel channel, string queueName, byte[] body, int retryCount){    var basicProperties = channel.CreateBasicProperties();    basicProperties.Headers = new Dictionary<string, object>();    basicProperties.Headers.Add("retryCount", retryCount);    channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: basicProperties, body: body);}static int ParseRetryCount(BasicDeliverEventArgs ea){    var existRetryRecord = ea.BasicProperties.Headers.TryGetValue("retryCount", out object retryCount);    if (!existRetryRecord)    {        throw new Exception("没有设置重试次数");    }    return (int)retryCount;}

    消息被拒收后,再重新发送消息到原有交换机或是队列下中,以使得消息像是消费失败回到了队列中,如此来控制消费次数,但是这种场景下,新消息排在了队列的尾部,而不是原消息排在队列头部。

    .Net Core和RabbitMQ怎么限制循环消费

    存储重试次数

    在存储服务中存储消息的唯一标识与对应重试次数,消费消息前对消息进行判断是否存在。

    .Net Core和RabbitMQ怎么限制循环消费

    与消息头判断一致,只是消息重试次数的存储从消息本身挪入存储服务中了。需要注意的是,消息发送端需要设置消息的唯一标识(MessageId属性)

    //模拟外部存储服务var MessageRetryCounts = new Dictionary<ulong, int>();var queueName = "storageretrycount_queue";channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        var maxRetryCount = 3;        Console.WriteLine("拒收");            //重试次数判断        var existRetryRecord = MessageRetryCounts.ContainsKey(ea.BasicProperties.MessageId);        if (!existRetryRecord)        {            //重入队列,继续重试            MessageRetryCounts.Add(ea.BasicProperties.MessageId, 1);            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);            return;        }            if (MessageRetryCounts[ea.BasicProperties.MessageId] < maxRetryCount)        {            //重入队列,继续重试            MessageRetryCounts[ea.BasicProperties.MessageId] = MessageRetryCounts[ea.BasicProperties.MessageId] + 1;            ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);            return;        }            //到达最大次数,不再重试消息        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: false);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    除第一次拒收外,允许三次重试机会,三次重试完毕后,设置requeue为false,消息丢失或进入死信队列(如有设置的话)。

    .Net Core和RabbitMQ怎么限制循环消费

    队列使用Quorum类型

    第一种和第二种分别是消息自身、外部存储服务来管理消息重试次数,使用Quorum,由MQ来限定消息的投递次数,也就控制了重试次数。

    .Net Core和RabbitMQ怎么限制循环消费

    设置队列类型为quorum,设置投递最大次数,当超过投递次数后,消息被丢弃。

    var queueName = "quorumtype_queue";var arguments = new Dictionary<string, object>(){    { "x-queue-type", "quorum"},    { "x-delivery-limit", 3 }};channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine($"拒收 {DateTime.Now}");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    第一次消费被拒收重入队列后,经最大三次投递后,消费端不再收到消息,如此一来也限制了消息的循环消费。

    .Net Core和RabbitMQ怎么限制循环消费

    队列消息过期

    当为消息设置了过期时间时,当消息没有受到Ack,且还在队列中,受到过期时间的限制,反复消费但未能成功时,消息将走向过期,进入死信队列或是被丢弃。

    聚焦于过期时间的限制,因此在消费者端,因异常或是拒收消息时,需要对requeue设置为true,将消息再次重入到原队列中。

    .Net Core和RabbitMQ怎么限制循环消费

    设定消费者端第五十条消息会被拒收,且队列的TTL设置为5秒。

    //死信交换机和死信队列var dlxExchangeName = "dlx_exchange";channel.ExchangeDeclare(exchange: dlxExchangeName, type: "fanout", durable: false, autoDelete: false, arguments: null);var dlxQueueName = "dlx_queue";channel.QueueDeclare(queue: dlxQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);channel.QueueBind(queue: dlxQueueName, exchange: dlxExchangeName, routingKey: "");//常规队列var queueName = "nORMalmessage_queue";var arguments = new Dictionary<string, object>{    { "x-message-ttl", 5000},    { "x-dead-letter-exchange", dlxExchangeName }};channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: arguments);channel.BasicQos(0, 5, false);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{    var message = ea.Body;    Console.WriteLine("接收到信息为:" + Encoding.UTF8.GetString(message.ToArray()));    if (Encoding.UTF8.GetString(message.ToArray()).Contains("50"))    {        Console.WriteLine($"拒收 {DateTime.Now}");        ((EventingBasicConsumer)model).Model.BasicReject(ea.DeliveryTag, requeue: true);        return;    }    ((EventingBasicConsumer)model).Model.BasicAck(ea.DeliveryTag, multiple: false);};channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

    当消费者端拒收消息后消息重入队列,再次消费,反复进行超过5秒后,消息在队列中达到了过期时间,则被挪入到死信队列中。

    .Net Core和RabbitMQ怎么限制循环消费

    WEB管理中死信队列中可查看该条过期的消息。

    .Net Core和RabbitMQ怎么限制循环消费

    关于“.Net Core和RabbitMQ怎么限制循环消费”这篇文章的内容就介绍到这里,感谢各位的阅读!相信大家对“.Net Core和RabbitMQ怎么限制循环消费”知识都有一定的了解,大家如果还想学习更多知识,欢迎关注编程网精选频道。

    --结束END--

    本文标题: .Net Core和RabbitMQ怎么限制循环消费

    本文链接: https://lsjlt.com/news/345017.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    猜你喜欢
    • .Net Core和RabbitMQ怎么限制循环消费
      这篇文章主要介绍了.Net Core和RabbitMQ怎么限制循环消费的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇.Net Core和RabbitMQ怎么限制循环消费文章都会有所收获,下面...
      99+
      2023-07-04
    • .Net Core和RabbitMQ限制循环消费的方法
      目录前言循环场景解决方案一次消费消息不重入队列限定重试次数消息头设定次数存储重试次数队列使用Quorum类型队列消息过期参考资料前言 当消费者端接收消息处理业务时,如果出现异常或是拒...
      99+
      2022-11-13
      .net core rabbitmq循环消费 .net core rabbitmq重复消费 .net core rabbitmq消费限制
    • Golang rabbitMQ生产者和消费者怎么实现
      今天小编给大家分享一下Golang rabbitMQ生产者和消费者怎么实现的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一...
      99+
      2023-06-30
    • Android怎么实现无限循环和自动轮播
      要实现Android中的无限循环和自动轮播,可以使用ViewPager和PagerAdapter来实现。下面是实现的步骤:1. 创建...
      99+
      2023-10-18
      Android
    • 使用node怎么实现事件循环和消息队列
      这篇文章将为大家详细讲解有关使用node怎么实现事件循环和消息队列,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。什么是异步?异步和同步应该是经常谈的一个话题了。同步的概念很简单,自上而下依次...
      99+
      2023-06-15
    • Shell脚本中条件控制和循环语句怎么用
      这篇文章主要介绍了Shell脚本中条件控制和循环语句怎么用,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。条件判断:if语句语法格式:if [ expres...
      99+
      2023-06-09
    • r语言中怎么进行条件判断和循环控制
      在R语言中,可以使用if语句进行条件判断,使用for循环、while循环、repeat循环等语句进行循环控制。 使用if语句进行条...
      99+
      2024-03-02
      r语言
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作