目录 一、概念 二、使用场景 三、RabbitMQ 中的 TTL (一)队列设置 TTL (二)消息设置 TTL (三)两者的区别 四、整合SpringBoot实现延迟队列 (一)创建项目 (二)添加依赖 (三)修改配置文件 (四)添加Sw
目录
Map arguments = new HashMap<>();// 声明队列的TTLarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
另一种方式便是针对每条消息设置 TTL
rabbitTemplate.convertAndSend("X", "XC", message, msg -> { msg.getMessageProperties().setExpiration(ttl); return msg;});
org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-amqp org.springframework.boot spring-boot-starter-WEB org.springframework.boot spring-boot-starter-test test com.alibaba fastJSON 1.2.47 org.projectlombok lombok io.springfox springfox-swagger2 2.9.2 io.springfox springfox-swagger-ui 2.9.2 org.springframework.amqp spring-rabbit-test test
spring.rabbitmq.host=192.168.23.100spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.passWord=guest
@Configuration@EnableSwagger2public class SwaggerConfig { @Bean public Docket webapiConfig() { return new Docket(DocumentationType.SWAGGER_2) .groupName("webapi") .apiInfo(webApiInfo()) .select() .build(); } public ApiInfo webApiInfo() { return new ApiInfoBuilder() .title("rabbitmq 接口文档") .description("本文档描述了 rabbitmq 微服务接口定义") .version("1.0") .contact(new Contact("enjoy6288", "Http://atguigu.com", "1551388580@qq.com")) .build(); }}
@Configurationpublic class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; // 声明xExchange @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } // 声明yExchange @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } // 声明队列A @Bean("queueA") public Queue queueA() { Map arguments = new HashMap<>(); // 当前队列的死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // 当前队列的死信路由key arguments.put("x-dead-letter-routing-key", "YD"); // 声明队列的TTL arguments.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } // 声明队列A绑定交换机X @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange")DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } // 声明队列B @Bean("queueB") public Queue queueB() { Map arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // 当前队列的死信路由key arguments.put("x-dead-letter-routing-key", "YD"); // 声明队列的TTL arguments.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } // 声明队列B绑定交换机X @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange")DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } // 声明死信队列 @Bean("queueD") public Queue queueD() { return new Queue(DEAD_LETTER_QUEUE); } @Bean // 声明死信队列 QD 绑定关系 public Binding queuedBindingY(@Qualifier("queueD")Queue queueD, @Qualifier("yExchange")DirectExchange exchange) { return BindingBuilder.bind(queueD).to(exchange).with("YD"); }}
@GetMapping("/sendMsg/{message}") public void sendMsg(@PathVariable String message) { log.info("当前时间是{},发送一条信息给两个 TTL 队列:{}", new Date().toString(), message); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + message); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + message); }
@Component@Slf4jpublic class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); log.info("当前时间:{},收到死信队列信息{}", new Date().toString(), msg); }}
发起一个请求 http://localhost:8080/ttl/sendMsg/嘻嘻嘻
第一条消息在 10S 后变成了死信消息,然后被消费者消费掉,第二条消息在 40S 之后变成了死信消息,然后被消费掉,这样一个延时队列就打造完成了。 不过,如果这样使用的话,岂不是 每增加一个新的时间需求,就要新增一个队列 ,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然后提前通知这样的场景,岂不是要增加无数个队列才能满足需求?
在这里新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间,而是由生产者设置过期时间
@Configurationpublic class TtlQueueConfig { public static final String X_EXCHANGE = "X"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD"; public static final String QUEUE_C = "QC"; // 声明xExchange @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } // 声明yExchange @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } // 声明队列A @Bean("queueA") public Queue queueA() { Map arguments = new HashMap<>(); // 当前队列的死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // 当前队列的死信路由key arguments.put("x-dead-letter-routing-key", "YD"); // 声明队列的TTL arguments.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } // 声明队列A绑定交换机X @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange")DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA"); } // 声明队列B @Bean("queueB") public Queue queueB() { Map arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // 当前队列的死信路由key arguments.put("x-dead-letter-routing-key", "YD"); // 声明队列的TTL arguments.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } // 声明队列B绑定交换机X @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange")DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB"); } // 声明队列C @Bean("queueC") public Queue queueC() { Map arguments = new HashMap<>(); arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); // 当前队列的死信路由key arguments.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } // 声明队列C绑定交换机X @Bean public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange")DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC"); } // 声明死信队列 @Bean("queueD") public Queue queueD() { return new Queue(DEAD_LETTER_QUEUE); } @Bean // 声明死信队列 QD 绑定关系 public Binding queuedBindingY(@Qualifier("queueD")Queue queueD, @Qualifier("yExchange")DirectExchange exchange) { return BindingBuilder.bind(queueD).to(exchange).with("YD"); }}
@GetMapping("/sendExpirationMsg/{message}/{ttl}") public void sendMsg(@PathVariable String message, @PathVariable String ttl) { log.info("当前时间是{},发送一条过期信息给两个 TTL 队列:{}", new Date().toString(), message); rabbitTemplate.convertAndSend("X", "XC", message, msg -> { msg.getMessageProperties().setExpiration(ttl); return msg; }); }
发起请求 http://localhost:8080/ttl/sendExpirationMsg/ 你好 1/20000 http://localhost:8080/ttl/sendExpirationMsg/ 你好 2/2000 看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期 ,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行 。
关于插件的安装可以查看这篇文章Docker安装RabbitMq延迟队列插件
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
@Configurationpublic class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; // 声明队列 @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE_NAME); } // 声明自定义交换机 @Bean public CustomExchange delayedExchange() { Map args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } // 声明队列和延迟交换机的绑定 @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue")Queue delayedQueue, @Qualifier("delayedExchange")CustomExchange exchange) { return BindingBuilder.bind(delayedQueue).to(exchange).with(DELAYED_ROUTING_KEY).noargs(); }}
@Component@Slf4jpublic class DelayedQueueConsumer { @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveDelayedQueue(String message) { log.info("当前时间:{}, 接收到消息: {}", new Date().toString(), message); }}
发起请求: http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000 http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000 第二个消息被先消费掉了,符合预期
来源地址:https://blog.csdn.net/m0_62946761/article/details/129245805
--结束END--
本文标题: RabbitMQ延迟队列
本文链接: https://lsjlt.com/news/388601.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-04-01
2024-04-03
2024-04-03
2024-01-21
2024-01-21
2024-01-21
2024-01-21
2023-12-23
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0