返回顶部
首页 > 资讯 > 后端开发 > Python >关于kafka-consumer-offset位移问题
  • 111
分享到

关于kafka-consumer-offset位移问题

kafka-consumer-offsetkafka-consumer-offset位移 2023-03-07 11:03:00 111人浏览 泡泡鱼

Python 官方文档:入门教程 => 点击学习

摘要

目录1 offset的默认维护位置1.1 消费offset案例2 自动提交offset3 手动提交offset3.1 原理3.2 代码示例4 指定offset消费5 指定时间消费6

1 offset的默认维护位置

_consumer_offsets主题里面采用key和 value的方式存储数据。

key是 group.id+topic+分区号value 就是当前offset的值

每隔一段时间,kafka 内部会对这个topic进行compact(压缩),也就是每个group.id+topic+分区号就保留最新数据。

Kafka0.9版本之前,consumer黑认将offset保存在ZooKeeper中。0.9版本开始,consumer默认将offset保存在Kafka一个内置的topic中,该topic为_consumer_offsets。

将offset信息存储在zk中的不足:如果将offset信息存储在zk中,那么所有的consumer都会访问zk,会消耗大量的网络资源,消费速度慢。

1.1 消费offset案例

思想:_consumer_offsets为Kafka中的 topic,那就可以通过消费者进行消费。

在配置文件 config/consumer.properties中添加配置exclude.internal.topics = false,默认是 true,表示不能消费系统主题。为了查看该系统主题数据,所以该参数修改为false。修改以后执行分发命令:xsync consumer.properties。

采用命令行方式,创建一个新的topic。

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 2

启动生产者往atguigu生产数据。

[atguigu@hadoop102 kafka] $ bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092

启动消费者消费atguigu数据。

[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh bootstrap-server hadoop102:9092--topic atguigu --group test

注意:指定消费者组名称,更好观察数据存储位置(key是 group.id+topic+分区号)。查看消费者消费主题_consumer_offsets。

[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic _consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --fORMatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning

2 自动提交offset

为了使我们能够专注于自己的业务逻辑,Kafka提供了自动提交offset的功能。自动提交offset的相关参数:

  • enable.auto.commit:是否开启自动提交offset功能,默认是true
  • auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s

消费者配置代码:

//配置是否是自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
//提交时间间隔,单位是ms
properties.put(ConsumerConfig.AUTO_COMNIT_INTERVAL_NS_CONFI6,1000);

3 手动提交offset

3.1 原理

虽然自动提交offset十分简单便利,但由于其是基于时间提交的,开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的api

手动提交offset的方法有两种:分别是commitSync(同步提交)commitAsync(异步提交)

两者的相同点是,都会将本次提交的一批数据最高的偏移量提交;不同点是,同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而异步提交则没有失败重试机制,故有可能提交失败。

  • commitSync(同步提交):必须等待offset提交完毕,再去消费下一批数据。
  • commitAsync(异步提交):发送完提交offset请求后,就开始消费下一批数据了

3.2 代码示例

3.2.1 同步提交

//手动提交属性配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
//消费代码逻辑
XXX
XXX
XXX
//手动提交代码(处理完数据以后,这里为了方便,只展示关键代码)
//手动提交offset
kafkaConsumer.commitsync();

3.2.2 异步提交(生产常用)

//手动提交属性配置
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false);
//消费代码逻辑
XXX
XXX
XXX
//手动提交代码(处理完数据以后,这里为了方便,只展示关键代码)
//手动提交offset
kafkaConsumer.commitAsync();

4 指定offset消费

auto.offset.reset = earliest | latest | none 默认是latest

当Kafka 中没有初始偏移量(消费者组第一次消费)或服务器上不再存在当前偏移量时(例如该数据已被删除),该怎么办?

  • earliest:自动将偏移量重置为最早的偏移量,--from-beginning。
  • latest(默认值):自动将偏移量重置为最新偏移量。
  • none:如果未找到消费者组的先前偏移量,则向消费者抛出异常。

任意指定offset位移开始消费。

//1创建消费者
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2订阅主题
ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
kafkaConsumer.subscribe(topics);
 
//指定位置进行消费
set<TopicPartition> assignment = kafkaConsumer.assignment();//获取所有分区信息
//保证分区分配方案已经制定完毕,因为由于leader消费者制定分配方案会消耗一定时间,有可能此时获取不到分区信息,所以加一层分区空间判断
while (assignment.size() == 0){
    //促使获取的分区数量不为0
    kafkaConsumer.poll(Duration.ofSeconds(1));
    assignment = kafkaConsumer.assignment();
}
 
//遍历所有分区,指定消费的offset
for (TopicPartition topicPartition : assignment) {
    kafkaConsumer.seek(topicPartition, 100);
}
 
// 3消费数据
while (true){

5 指定时间消费

需求:在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。

例如要求按照时间消费前一天的数据,怎么处理?

//1创建消费者
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2订阅主题
ArrayList<String> topics = new ArrayList<>(;topics.add( "first");
kafkaConsumer.subscribe(topics);
 
//指定位置进行消费
set<TopicPartition> assignment = kafkaConsumer.assignment();//获取所有分区信息
//保证分区分配方案已经制定完毕,因为由于leader消费者制定分配方案会消耗一定时间,有可能此时获取不到分区信息,所以加一层分区空间判断
while (assignment.size() == 0){
    //促使获取的分区数量不为0
    kafkaConsumer.poll(Duration.ofSeconds(1));
    assignment = kafkaConsumer.assignment();
}
//希望把时间转换为对应的offset
HashMap<TopicPartition,Long> topicPartitionLongHashMap = new HashMap<>();
//封装对应集合
for (TopicPartition topicPartition : assignment) {
    //希望获取当前系统时间一天前的数据。
    topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
Nap<TopicPartition,OffsetAnd imestamp> topioPartitionffsetAndrtimestampMep = karfiaConsumer.offsetsForTines(topicPartitionL ongHashiap);
 
 
//遍历所有分区,指定消费的offset
//指定消费的offset
for (TopicPartition topicPartition : assignment) {
    OffsetAndTimestamp offsetAndTimestamp = topicPartition0ffsetAndTimestampHap.get(topicPartition);
    kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());
}
 
// 3消费数据
while (true){

6 漏消费和重复消费分析

6.1 重复消费

场景1:重复消费。自动提交offset引起。

6.2 漏消费

场景1:漏消费。设置offset为手动提交,当offset被提交时,数据还在内存中未落盘,此时刚好消费者线程被kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。

6.3 消费者事务

如果想完成Consumer端的精准一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定

此时我们需要将Kafka的offset保存到支持事务的自定义介质(比如Mysql)。这部分知识会在后续项目部分涉及。

7 数据积压

方案1:如果是Kafka消费能力不足,则可以考虑增加Topic的分区数,并且同时提升消费组的消费者数量,消费者数=分区数。(两者缺一不可)

方案2:如果是下游的数据处理不及时:提高每批次拉取的数量。批次拉取数据过少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据,也会造成数据积压。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

--结束END--

本文标题: 关于kafka-consumer-offset位移问题

本文链接: https://lsjlt.com/news/198832.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 关于kafka-consumer-offset位移问题
    目录1 offset的默认维护位置1.1 消费offset案例2 自动提交offset3 手动提交offset3.1 原理3.2 代码示例4 指定offset消费5 指定时间消费6 ...
    99+
    2023-03-07
    kafka-consumer-offset kafka-consumer-offset位移
  • kafka-consumer-offset位移问题怎么解决
    这篇文章主要介绍了kafka-consumer-offset位移问题怎么解决的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇kafka-consumer-offset位移问题怎么解决文章都会有所收获,下面我们一起...
    99+
    2023-07-05
  • 《Kafka系列》Offset Explorer连接Kafka问题集合,Timeout expired while.. topic metadata,Uable to find any brokers
    Offset Explorer连接Kafka问题集合,(Timeout expired while fetching topic metadata),(Uable to find any broker...
    99+
    2023-08-31
    kafka java 分布式
  • 关于MySQL的时间进位问题浅析
    MySQL 当中默认的时间类型(datetime 和 timestamp)的精度是秒,如果设置进去的时间值精度小于秒的话,就会被四舍五入,可能导致数据库中的值比原始值多了一秒。也就是说,本来属于今天的记录可...
    99+
    2024-04-02
  • 关于kafka消费不到远程bootstrap-server 数据的问题
    本文重点给大家介绍kafka消费不到远程bootstrap-server 数据的问题原因分析及解决方法,内容如下所示: 问题 执行 ./bin/kafka-console-consu...
    99+
    2024-04-02
  • 关于Pytorch中模型的保存与迁移问题
    目录1 引言2 模型的保存与复用2.1 查看网络模型参数2.2 载入模型进行推断2.3 载入模型进行训练2.4 载入模型进行迁移3 总结1 引言 各位朋友大家好,欢迎来到月来客栈。今...
    99+
    2024-04-02
  • 关于Spark Streaming感知kafka动态分区的问题该怎么理解
    关于Spark Streaming感知kafka动态分区的问题该怎么理解,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。小编主要是讲解Spark Streaming与kafk...
    99+
    2023-06-19
  • 关于golangtest缓存问题
    在使用vscode进行debug golang 时,出现test结果缓存的情况导致若干次test结果一致问题设置go test不用缓存官方说明https://pkg.go.dev/c...
    99+
    2024-04-02
  • 关于Dubbo初始问题
    目录Dubbo架构节点角色说明过程Zookeeper安装Dubbo快速入门jar包依赖开始配置dubbo配置zk总结Dubbo架构 消费者:调用提供者,但是不能直接调,需要借助注...
    99+
    2023-03-01
    Dubbo初始 关于Dubbo初始 Dubbo初始问题
  • 关于SpringCloudStream配置问题
    目录SpringCloudStream配置Stream配置Kafka绑定器配置总结SpringCloudStream配置 以下配置摘自《SpringCloud微服务实战》,配置主要包...
    99+
    2022-12-28
    SpringCloudStream配置 SpringCloudStream
  • 关于JSP中文问题
    在jsp中,中文乱码常会让人心乱如麻。 对于 中文处理的常见对策,在网上经常可见的主要是下面2种: 或者: 虚拟小龙亭也主要采用了上面2种方法进行了修改,从而变成了1.1版。 通过简单总结,示例中的中文处理发生于以下几个地方: 1 在u...
    99+
    2023-06-03
  • VB关于FindWindowEx的问题
    FindWindowEx函数用于在指定窗口的子窗口中查找符合指定条件的窗口。它的声明如下:Public Declare Functi...
    99+
    2023-08-08
    VB
  • python实现数组平移K位问题
    目录python数组平移K位Python对数组进行循环移位要求分析代码实现性能分析总结python数组平移K位 def move(ls: list, offset): """...
    99+
    2023-02-06
    python数组 数组平移K位 python数组平移
  • 关于element-ui select 下拉框位置错乱问题解决
    目录element-ui select 下拉框位置错乱element-ui使用时的一些坑点总结1.el-select 下拉框绑定值为对象时选中值显示错乱2.局部覆盖element-u...
    99+
    2024-04-02
  • 关于C++中由于字节对齐引起内存问题定位分析
    最近遇到了一个奇怪的问题,在创建对象时程序异常退出,具体地,在构造函数中访问类中最后一个成员变量时,程序异常退出。 问题定位 查看代码,发现该类中有一个结构体数组,该结构体在类的外面...
    99+
    2024-04-02
  • 关于关闭管道的问题DisconnectNamedPipe
    `DisconnectNamedPipe` 是一个Windows API函数,用于关闭命名管道。命名管道是一种实现进程间通信的机制,...
    99+
    2023-08-08
    问题
  • 关于mysql5.6 的排序问题.
    mysql 5.6 的排序进行了优化.  同样的sql , 在5.5 跟5.6 上可能得到不同的结果: CREATE TABLE `TestCase2` (   ...
    99+
    2024-04-02
  • 关于Vue虚拟dom问题
    目录一、什么是虚拟dom?二、为什么需要虚拟dom三、虚拟dom是如何转换为真实dom的四、模板和虚拟dom的关系一、什么是虚拟dom? 虚拟dom本质上就是一个普通的JS对象,用于...
    99+
    2024-04-02
  • 关于C# dynamic装箱问题
    目录前言装箱拆箱探究本质匿名类型总结前言 前几天在技术群里看到有同学在讨论关于dynamic是否会存在装箱拆箱的问题,我当时第一想法是"会"。至于为啥会有很多人有...
    99+
    2024-04-02
  • 关于JWTtoken的管理问题
    JWT简介:      Json web token (JWT), 是为了在网络应用环境间传递声明而执行的一种基于JSON的开放标准。因为网络上有很多关于jwt的详细介绍了,所以我这里就不再赘述。但是JWT的大概还是要简要讲一下的。   ...
    99+
    2023-01-30
    JWTtoken
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作