返回顶部
首页 > 资讯 > 后端开发 > 其他教程 >redisstream实现消息队列的实践
  • 425
分享到

redisstream实现消息队列的实践

redisstream消息队列redis消息队列 2022-11-13 14:11:48 425人浏览 薄情痞子
摘要

目录Redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pen

Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列MQ,Message Queue)的完善实现。

基于redis实现消息队列的方式有很多:

  • PUB/SUB,订阅/发布模式
  • 基于List的 LPUSH+BRPOP 的实现

redis 实现消息对列4中方法

发布订阅

发布订阅优点: 典型的一对的,所有消费者都能同时消费到消息。主动通知订阅者而不是订阅者轮询去读。

发布订阅缺点: 不支持多个消费者公平消费消息,消息没有持久化,不管订阅者是否收到消息,消息都会丢失。

使用场景:微服务间的消息同步,如 分布式WEBSocker,数据同步等。

list 队列

生产者通过lpush生成消息,消费者通过blpop阻塞读取消息。

**list队列优点:**支持多个消费者公平消费消息,对消息进行存储,可以通过lrange查询队列内的消息。

**list队列缺点:**blpop仍然会阻塞当前连接,导致连接不可用。一旦blpop成功消息就丢弃了,期间如果服务器宕机消息会丢失,不支持一对多消费者。

zset 队列

生产者通过zadd 创建消息时指定分数,可以确定消息的顺序,消费者通过zrange获取消息后进行消费,消费完后通zrem删除消息。

zset优点: 保证了消息的顺序,消费者消费失败后重新入队不会打乱消费顺序。

zset缺点: 不支持一对多消费,多个消费者消费时可能出现读取同一条消息的情况,得通过加或其他方式解决消费的幂等性。

zset使用场景:由于数据是有序的,常常被用于延迟队列,如 redisson的DelayQueue

Stream 队列

Redis5.0带来了Stream类型。从字面上看是流类型,但其实从功能上看,应该是Redis对消息队列(MQ,Message Queue)的完善实现。

参考kafka的思想,通过多个消费者组和消费者支持一对多消费,公平消费,消费者内维护了pending列表防止消息丢失。

提供消息ack机制。

基本命令

xadd 生产消息

往 stream 内创建消息 语法为:

XADD key ID field string [field string …]

# * 表示自动生成id redis会根据时间戳+序列号自动生成id,不建议我们自己指定id
xadd stream1 * name zs age 23  

读取消息

读取stream内的消息,这个并不是消费,只是提供了查看数据的功能,语法为:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#表示从 stream1 内取出一条消息,从第0条消息读取(0表示最小的id)
xread count 1 streams stream1 0
#表示从 stream1 内 id=1649143363972-0 开始读取一条消息,读取的是指定id的下一条消息
xread count 1 streams msg 1649143363972-0

#表示一直阻塞读取最新的消息($表示获取下一个生成的消息)
xread count 1 block 0 streams stream1 $ 

xrange stream - + 10

XRANGE key startID endID count

#表示从stream1内取10条消息 起始位置为 -(最小ID) 结束位置为+(最大ID)
xrange stream1 - + 10 

xgroup 消费者组

redis stream 借鉴了kafka的设计,采用了消费者和消费者组的概念。允许多个消费者组消费stream的消息,每个消费者组都能收到完整的消息,例如:stream内有10条消息,消费者组A和消费者组B同时消费时,都能获取到这10条消息。

每个消费者组内可以有多个消费者消费,消息会平均分摊给各个消费者,例如:stream有10条消息,消费者A,B,C同时在同一个组内消费,A接收到 1,4,7,10,B接收到 2,5,8,C接收到 3,6,9

创建消费者组:

#消费消息首先得创建消费者组
# 表示为队列 stream1 创建一个消费者组 group1 从消息id=0(第一条消息)开始读取消息
xgroup create stream1 group1 0

#查询stream1内的所有消费者组信息
xinfo groups stream1

xreadgroup 消费消息

通过xreadgroup可以在消费者组内创建消费者消费消息

XREADGROUP group groupName consumerName [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

#创建消费者读取消息
#在group1消费者组内通过consumer1消费stream1内的消息,消费1条未分配的消息 (> 表示未分配过消费者的消息)
xreadgrup group group1 consumer1 count 1 streams stream1 > 

Pending 等待列表

通过 xreadgroup 读取消息时消息会分配给对应的消费者,每个消费者内都维护了一个Pending列表用于保存接收到的消息,当消息ack后会从pending列表内移除,也就是说pending列表内维护的是所有未ack的消息id

每个Pending的消息有4个属性:

  • 消息ID
  • 所属消费者
  • IDLE,已读取时长
  • delivery counter,消息被读取次数

XPENDING key group [start end count] [consumer]

#查看pending列表
# 查看group1组内的consumer1的pending列表 - 表示最小的消息id + 表示最大的消息ID
xpending stream1 group1 - + 10 consumer1
# 查看group1组内的所有消费者pending类表
xpending stream1 group1 - + 10 

消息确认

当消费者消费了消息,需要通过 xack 命令确认消息,xack后的消息会从pending列表移除

XACK key gruopName ID

xack stream1 group1 xxx

消息转移

当消费者接收到消息却不能正确消费时(报错或其他原因),可以使用 XCLaiM 将消息转移给其他消费者消费,需要设置组、转移的目标消费者和消息ID,同时需要提供IDLE(已被读取时长),只有超过这个时长,才能被转移。

通过xclaim转移的消息只是将消息移入另一个消费者的pending列表,消费者并不能通过xreadgroup读取到消息,只能通过xpending读取到。

# 表示将ID为 1553585533795-1 的消息转移到消费者B消费,前提是消费
XCLAIM stream1 group1 consumer1 3600000 1553585533795-1

信息监控

redis提供了xinfo来查看stream的信息

#查看sream信息
xinfo stream steam1
#查询消费者组信息
xinfo groups group1 

#查询消费者信息
xinfo consumers consumer1

springBoot 整合

1 引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2 编写消费者

@Slf4j
@Component
public class EmailConsumer implements StreamListener<String, MapRecord<String,String,String>> {

    public final String streamName      = "emailStream";
    public final String groupName       = "emailGroup";
    public final String consumerName    = "emailConsumer";


    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {

        //log.info("stream名称-->{}",message.getStream());
        //log.info("消息ID-->{}",message.getId());
        log.info("消息内容-->{}",message.getValue());

        Map<String, String> msgMap = message.getValue();

        if( msgMap.get("sID")!=null && Integer.valueOf(msgMap.get("sID")) % 3 ==0 ){
            //消费异常导致未能ack时,消息会进入pending列表,我们可以启动定时任务来读取pending列表处理失败的任务
            log.info("消费异常-->"+message);
           return;
        }

        StreamOperations<String, String, String> streamOperations = stringRedisTemplate.opsForStream();
        //消息应答
        streamOperations.acknowledge( streamName,groupName,message.getId() );

    }
	//我们可以启动定时任务不断监听pending列表,处理死信消息
}

3 配置redis

序列化配置

@EnableCaching
@Configuration
public class RedisConfig {


    
    @Bean
    public Jackson2JSONRedisSerializer<Object> jackson2jsonRedisSerializer(){
        Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        return jackson2JsonRedisSerializer;
    }

    
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory,
                                                       Jackson2JsonRedisSerializer jackson2JsonRedisSerializer) {

        // 配置redisTemplate
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        RedisSerializer<?> stringSerializer = new StringRedisSerializer();

        // key序列化
        redisTemplate.seTKEySerializer(stringSerializer);
        // value序列化
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);

        // Hash key序列化
        redisTemplate.setHashKeySerializer(stringSerializer);
        // Hash value序列化
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

}

消费者组和消费者配置

@Slf4j
@Configuration
public class RedisStreamConfig {

    @Autowired
    private EmailConsumer emailConsumer;

    @Autowired
    private RedisTemplate<String,Object> redisTemplate;

    @Bean
    public StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> emailListenerContainerOptions(){

        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();

        return StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                .builder()
                //block读取超时时间
                .pollTimeout(Duration.ofSeconds(3))
                //count 数量(一次只获取一条消息)
                .batchSize(1)
                //序列化规则
                .serializer( stringRedisSerializer )
                .build();
    }

    
    @Bean
    public StreamMessageListenerContainer<String,MapRecord<String,String,String>> emailListenerContainer(RedisConnectionFactory factory,
                                                                 StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String,String,String>> streamMessageListenerContainerOptions){

        StreamMessageListenerContainer<String,MapRecord<String,String,String>> listenerContainer = StreamMessageListenerContainer.create(factory,
                streamMessageListenerContainerOptions);

        //如果 流不存在 创建 stream 流
        if( !redisTemplate.hasKey(emailConsumer.streamName)){
            redisTemplate.opsForStream().add(emailConsumer.streamName, Collections.singletonMap("", ""));
            log.info("初始化stream {} success",emailConsumer.streamName);
        }

        //创建消费者组
        try {
            redisTemplate.opsForStream().createGroup(emailConsumer.streamName,emailConsumer.groupName);
        } catch (Exception e) {
            log.info("消费者组 {} 已存在",emailConsumer.groupName);
        }

        //注册消费者 消费者名称,从哪条消息开始消费,消费者类
        // > 表示没消费过的消息
        // $ 表示最新的消息
        listenerContainer.receive(
            Consumer.from(emailConsumer.groupName, emailConsumer.consumerName),
            StreamOffset.create(emailConsumer.streamName, ReadOffset.lastConsumed()),
            emailConsumer
        );


        listenerContainer.start();
        return listenerContainer;
    }

}

4.生产者生产消息

@GetMapping("/redis/ps")
public String redisPublish(String content,Integer count){

    StreamOperations streamOperations = redisTemplate.opsForStream();

    for (int i = 0; i < count; i++) {
        AtomicInteger num = new AtomicInteger(i);

        Map msgMap = new HashMap();
        msgMap.put("count", i);
        msgMap.put("sID", num);
        //新增消息
        streamOperations.add("emailStream",msgMap);
    }
    return "success";
}

参考文档:

redis Stream 消息队列

SpringBoot整合redis stream 实现消息队列

到此这篇关于redis stream 实现消息队列的实践的文章就介绍到这了,更多相关redis stream 消息队列内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: redisstream实现消息队列的实践

本文链接: https://lsjlt.com/news/172328.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • redisstream实现消息队列的实践
    目录redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pen...
    99+
    2022-11-13
    redis stream 消息队列 redis 消息队列
  • 详解RedisStream做消息队列
    目录ListPub/subStreamConsumer Grouplast_delivered_idpending_idscurdpending_ids如何避免消息丢失嵌入...
    99+
    2024-04-02
  • redis stream 实现消息队列的实践
    目录Redis 实现消息对列4中方法发布订阅list 队列zset 队列Stream 队列基本命令xadd 生产消息读取消息xgroup 消费者组xreadgroup 消费消息Pending 等待列表消息确认消息转移信息...
    99+
    2022-08-10
    redisstream消息队列 redis消息队列
  • golang消息队列实现
    Golang是一种开源的编程语言,它适用于创建高性能的网络应用程序和消息队列等分布式系统。在这篇文章中,我们将探讨如何使用Golang来实现一个消息队列。什么是消息队列?在分布式系统中,应用程序通常需要在不同的节点之间共享数据。消息队列是用...
    99+
    2023-05-15
  • SpringBoot2实现MessageQueue消息队列
    目录什么是消息队列一、异步与同步1.1 同步通讯与异步通讯1.2 同步调用的问题1.3 异步调用方案二、MQ消息队列2.1 单机部署MQ2.2 结构和概念2.3 常见的消息模型三、S...
    99+
    2023-05-17
    SpringBoot2 MessageQueue消息队列 SpringBoot MessageQueue
  • Redis中如何实现消息队列和延时消息队列
    这篇文章将为大家详细讲解有关Redis中如何实现消息队列和延时消息队列,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。list的几个命令lpush (left push)由...
    99+
    2024-04-02
  • Golang中使用RabbitMQ实现消息队列的原理和实践
    在Golang中使用RabbitMQ实现消息队列的原理和实践主要涉及以下几个方面:1. RabbitMQ介绍:RabbitMQ是一个...
    99+
    2023-10-08
    Golang
  • redis实现消息队列的方法
    这期内容当中的小编将会给大家带来有关redis实现消息队列的方法,以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。消息队列,Message Queue,常用于解决并发系统中的资源一致性问题...
    99+
    2024-04-02
  • redis怎么实现消息队列
    Redis可以通过以下几种方式实现消息队列:1. List数据结构:使用Redis的List数据结构实现简单的消息队列。生产者将消息...
    99+
    2023-09-14
    redis
  • go怎么实现消息队列
    在Go语言中,你可以使用第三方库来实现消息队列。目前比较常用的消息队列库有:1. RabbitMQ:RabbitMQ是一个开源的消息...
    99+
    2023-08-31
    go
  • MSMQ消息队列怎么实现
    这篇文章主要介绍了MSMQ消息队列怎么实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇MSMQ消息队列怎么实现文章都会有所收获,下面我们一起来看看吧。一、引言Windows Communication Fou...
    99+
    2023-06-29
  • golang怎么实现消息队列
    在Golang中实现消息队列可以使用Golang的内置库和第三方库来实现。以下是两种常见的实现方式: 使用Golang内置的cha...
    99+
    2023-10-25
    golang
  • redis如何实现消息队列
    Redis可以实现消息队列的功能,常用的实现方式是使用Redis的List数据结构来存储消息队列中的消息。具体实现步骤如下: 将...
    99+
    2024-04-22
    Redis
  • Java实现Redis延时消息队列
    目录什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型2.RedisMq 消息队列实现类3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以...
    99+
    2024-04-02
  • PHP怎么实现RabbitMQ消息列队
    这篇“PHP怎么实现RabbitMQ消息列队”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“PHP怎么实现RabbitMQ消息...
    99+
    2023-06-30
  • Python中线程的MQ消息队列实现以及消息队列的优点解析
    “消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成...
    99+
    2022-06-04
    队列 消息 线程
  • 谈谈消息队列的设计与实现
    消息队列是一种存储和传递消息的机制,用于实现应用程序之间的异步通信。它可以帮助解耦应用程序的组件,提高系统的可伸缩性和可靠性。消息队...
    99+
    2023-09-21
    消息队列
  • 使用PHP实现消息队列的开发
    随着现代互联网应用对高并发、高吞吐量和高可靠性的要求越来越高,消息队列作为一种异步解耦系统架构方式越来越被应用在互联网领域的各个方面。其原理是先将消息发送到消息队列中,等待异步消费,从而达到解耦的目的,提高系统的可扩展性与可维护性。在目前市...
    99+
    2023-05-25
    PHP 消息队列 开发
  • Java消息队列的简单实现代码
    今天看到我们的招聘信息有对消息队列有要求,然后就思索了一翻,网上一搜一大堆。我可以举个小例子先说明应用场景假设你的服务器每分钟的处理量为200个,但客户端再峰值的时候可能一分钟会发1000个消息给你,这时候你就可以把他做成队列,然后按正常有...
    99+
    2023-05-31
    java 消息队列 ava
  • Redis中的消息队列序列化怎么实现
    在Redis中实现消息队列序列化可以使用各种不同的方法,其中一种常见的方法是使用JSON序列化。可以将消息数据转换为JSON格式存储...
    99+
    2024-04-29
    Redis
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作