返回顶部
首页 > 资讯 > 精选 >java分布式流处理组件Producer怎么使用
  • 400
分享到

java分布式流处理组件Producer怎么使用

2023-07-05 09:07:46 400人浏览 安东尼
摘要

这篇文章主要讲解了“java分布式流处理组件Producer怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流处理组件Producer怎么使用”吧!基于Java的api首

这篇文章主要讲解了“java分布式流处理组件Producer怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流处理组件Producer怎么使用”吧!

    基于Java的api

    首先, 在了解生产者发送消息的原理之前,我们应该先学会如何去发送消息。

    kafka为我们提供了很多项目可以操作的API客户端,包括:

    通过官网查看API菜单,官方文档上也是Java的版本。我们根据提示一步步操作即可~

    先新建Maven项目,并且引入对应的****kafka-clients依赖

    建议:Kafka-clients依赖版本,最好和安装的kafka版本一致

    <dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka-clients</artifactId>    <version>3.3.1</version></dependency>

    同步发送

    Kafka生产者主要靠KafkaProducer来进行操作。点击到对应的文档页面,我们可以看到关于KafkaProducer<K,V> 的详细信息。

    一个好的组件是非常贴心的, 甚至我们都不用去网上搜任何相关的资料,只需要通过查看对应的注释就可以知道这个东西该怎么用。

    Properties config = new Properties();// --bootstrap-serverconfig.setProperty(  ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,   "master:9092,node01:9092,node02:9092");// key 序列化器config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// value 序列化器config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");try(Producer<String, String> producer = new KafkaProducer<>(config)) {    ProducerRecord<String, String> record = new ProducerRecord<>(            "newTopic001",            "key01",            "data from " + KafkaQuickProducer.class.getName()    );     RecordMetadata recordMetadata = producer.send(record).get();    System.out.println(            MessageFORMat.format("{0}\t{1}\t{2}\t{3}",                     recordMetadata.topic(),                     recordMetadata.partition(),                    recordMetadata.offset(),                     recordMetadata.timestamp()            )    );} catch (Exception e) {    e.printStackTrace();}

    以上代码就是同步发送的过程,这已经是在开发过程中需要配置的最小单元,而其他关于生产者的配置,我们可以通过ProducerConfig来进行查看

    ** 与命令行上的参数,基本上是一模一样的**

    而关于序列化器的问题,我们在下面原理的部分说明

    异步发送

    我们在调用同步send的时候,发现有两个参数的方法, 而这个方法实现的就是****异步发送

    Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

    异步发送会将发送结果以事件驱动的形式传递,那么这里,我们就需要注意一点:

    • 程序调用完成之后,不能让他立即执行,否则我们无法查看到具体的发送结果

    接下来我们看具体的程序实现。理论上:我们只需要改最后发送的部分

    Properties config = new Properties();// --bootstrap-serverconfig.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,node01:9092,node02:9092");// key 序列化器config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");// value 序列化器config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");try(Producer<String, String> producer = new KafkaProducer<>(config)) {    ProducerRecord<String, String> record = new ProducerRecord<>(            "newTopic001",            "key01",            "data from " + KafkaQuickProducer.class.getName()    );    async(producer, record);} catch (Exception e) {    e.printStackTrace();}// 异步发送private static void async(Producer<String, String> producer, ProducerRecord<String, String> record) {    producer.send(record, (recordMetadata, exception) -> {        if (null != exception) {            exception.printStackTrace();            return;        }        System.out.println(                MessageFormat.format("{0}\t{1}\t{2}\t{3}",                        recordMetadata.topic(),                        recordMetadata.partition(),                        recordMetadata.offset(),                        recordMetadata.timestamp()                )        );    });    try {        // 将程序进行阻塞,防止由于消息发送成功之后进程停止而无法接收到事件反馈        System.in.read();    } catch (IOException e) {        throw new RuntimeException(e);    }}

    这属于整个生产者发送消息方式的最小单元,本文属于Producer入门阶段。

    在ProducerConfig中还包含了非常多的配置项,更多的配置信息我们会在优化章节中说明。

    原理

    java分布式流处理组件Producer怎么使用

    在第一部分,我们已经了解到,关于生产者最基本的使用方式,到这里,其实我想跟大家聊一聊:

    • 生产者在发送消息的时候中间到底经历了什么?

    大家应该已经看到上面的那张原理图,我们可以从中找出答案!

    线程

    **这里我们分为两个线程块来说明, 第一部分是Main主线程, 也就是生产者在调用****send()**方法时所在的线程

    在这里,我们可以看到:

    • 外部数据首先被封装为ProducerRecord**,然后调用**send()**方法。

    • 在send()过程中,经过拦截器、序列化器、分区器等处理之后进入到RecordAccumulator中。

    接下来我们仔细聊一聊拦截器、序列化器、分区器的作用

    拦截器

    拦截器很类似于我们在springMVC中Interceptor的功能,而且在Producer中我们是可以自定义拦截器的。

    我们可以在发送之前对数据进行拦截处理,比如说:统计生产者发送数据的总量等等。

    当然目前来讲,我们如果不开发Kafka监控平台的话,这里拦截器的用处并不大。我们忽略不计即可

    后续如果有机会的话,我们可以专门写篇文章,用来介绍如何开发一个拦截器

    序列化器

    而序列化器,主要对两个部分的数据进行处理:

    • Key

    • Value

    byte[] serializedKey   = serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());byte[] serializedValue  = valueSerializer.serialize(record.topic(), record.headers(), record.value());

    从本质上来讲,外部数据属于属于对象,而对象不能直接通过网络进行传输。 所以我们就需要一个序列化器,将它转换成字节数组,进而进行传输

    java分布式流处理组件Producer怎么使用

    Kafka本身为我们提供了很多可用的序列化器,不过我们能用到最多的还是StringSerializer。

    在生产端将消息进行序列话,那么在消费端必然会进行反序列化操作

    分区器

    我们知道Kafka是以Topic为消息发送的主体,不过由于Topic是一个虚拟的概念, 所以我们没有办法在实际中查看到关于Topic的相关信息。 但是前面我们也说过, 当前Topic下的消息数据都是通过Partition进行存储的。

    发送出去的消息需要存储在哪个分区中就是通过分区器来进行指定的,在我们没有指定分区策略的情况下,生产者会通过默认的分区策略指定当前消息应该存储在哪个分区下

    java分布式流处理组件Producer怎么使用

    分区的内容还是比较多的,我们会在下一节做详细的说明

    RecordAccumulator

    此时,在主线程的区域中,当消息进入到默认大小为32m的记录缓冲区时, 本区的工作就到此结束。

    缓冲区中有多个双端队列,分别对应Topic不同的分区。每一个分区就会创建一个双端队列。

    此时的消息将会被按照批次的方式存放在队列中, 默认一批为16k大小。当缓冲区达到指定条件之后,****sender线程将会被唤醒,Sender程序将会冲队列中不断拉出消息进行下一步的发送

    Sender线程

    影响Sender线程唤醒的条件

    想要唤醒Sender线程有两个因素,但不是说这两个条件都必须满足,他们是或的关系。

    batch.size是一个条件,这也是后期针对生产者优化的主要参数之一。

    当发送消息之后,生产者会将消息进行整合。将其按照一批一批的方式发送给Broker,从而减少网络间的传输请求次数。默认情况下为16k。

    而如果一批数据的大小累计达到了设置的batch.size之后,sender才会做发送数据的操作

    这是第一个限制

    下面再来介绍一个非常强势的参数:liner.ms。生产者优化的主要参数之二。

    这么说吧,如果你设置的liner.ms=0,表示不延迟直接发送。那么batch.size就不会生效了

    而liner.ms=0属于默认配置

    如果数据一直没有达到设置的batch.size大小,数据也不能不发对吧。所以Kafka也就为我们提供了这样的参数:

    • 当sender等待liner.ms设置的时间之后【单位ms】,不管数据如何都会将消息进行发送

    • 如未设置当前参数,表示没有延迟,直接发送

    下面举个小例子

    config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "5000");

    java分布式流处理组件Producer怎么使用

    开始发送

    RecordAccumulator内存储的数据拉取出来之后,开始将其创建为一个个的Request请求。这里需要注意的是:

    • NetworkClient并非一股脑的将全部可发送数据进行传输请求

    正相反,为了能够保证不同分区所对应DQueue的数据进入到对应的Broker所在的分区内,Kafka将按照<BrokerId, Request>的形式对请求进行传输。如果传输到达Broker之后没有acks应答,那么当前节点下最多能够保存5个未响应的请求。

    ACKS

    这里简单聊一下它的应答方式。在ProducerConfig.ACKS_DOC下我们也可以看到相关的说明:

    • acks=0: 生产者不会等待Broker的应答,直接表示消息已经发送成功。而消息有没有真正达到Broker,不关心。

    当然了,这种方式在性能上来讲是最好的,适合一些数据不重要的场景

    • acks=1: 生产者将消息发送到Broker之后,由Leader在本地将消息进行存储之后,返回发送成功的应答。

    如果Follower还没有同步到消息,Leader就已经挂了。那么此时就会出现消息丢失的情况

    • acks=all:生产者将消息发送到Broker之后,由Leader在本地将消息进行存储,并且Follower同步完消息之后才会返回发送成功的应答。

    这种方式是最能保证数据安全的情况,但是性能也是最低的~

    最后:

    • 当Broker返回成功应答之后,RecordAccumulator中的数据将会被清理

    • 如果失败,可以尝试重试等操作

    感谢各位的阅读,以上就是“java分布式流处理组件Producer怎么使用”的内容了,经过本文的学习后,相信大家对java分布式流处理组件Producer怎么使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是编程网,小编将为大家推送更多相关知识点的文章,欢迎关注!

    --结束END--

    本文标题: java分布式流处理组件Producer怎么使用

    本文链接: https://lsjlt.com/news/350990.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    猜你喜欢
    • java分布式流处理组件Producer怎么使用
      这篇文章主要讲解了“java分布式流处理组件Producer怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流处理组件Producer怎么使用”吧!基于Java的API首...
      99+
      2023-07-05
    • java分布式流式处理组件Producer分区理论
      目录前言为什么需要分区分区的作用分区策略DefaultPartitionerRoundRobinPartitioner自定义分区器代码说明前言 前面我们已经对Producer发送原...
      99+
      2023-03-07
      java分布式Producer分区 java Producer流式处理
    • java分布式流式处理组件Producer分区的作用是什么
      这篇文章主要讲解了“java分布式流式处理组件Producer分区的作用是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流式处理组件Producer分区的作用是什么”吧!为...
      99+
      2023-07-05
    • Redis分布式限流组件设计与使用实例
      目录1.背景2.Redis计数器限流设计2.1Lua脚本2.2自定义注解2.3限流组件2.4限流切面实现3.测试一下3.1方法限流示例3.2动态入参限流示例4.其它扩展5.源码地址本...
      99+
      2024-04-02
    • 如何在Java中使用Numpy来处理分布式文件?
      近年来,随着数据量的增加,分布式文件处理变得越来越重要。而对于Java开发者来说,使用Numpy来处理分布式文件可能是一个不错的选择。本文将介绍如何在Java中使用Numpy来处理分布式文件,并附带一些演示代码。 一、Numpy简介 Num...
      99+
      2023-07-28
      文件 分布式 numpy
    • redisson分布式限流RRateLimiter怎么使用
      今天小编给大家分享一下redisson分布式限流RRateLimiter怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧...
      99+
      2023-07-04
    • Java Swing组件布局管理器之FlowLayout(流式布局)入门教程
      本文实例讲述了Java Swing组件布局管理器之FlowLayout(流式布局)。分享给大家供大家参考,具体如下:FlowLayout应该是Swing布局管理器学习中最简单、最基础的一个。所谓流式,就是内部控件像水流一样,从前到后按顺序水...
      99+
      2023-05-30
      java swing ava
    • Java中如何处理分布式文件系统?
      随着互联网的发展,越来越多的应用需要处理大量的文件数据,而传统的单机文件系统已经无法满足这种需求。因此,分布式文件系统应运而生。分布式文件系统是一种能够将数据存储在多台服务器上的文件系统,通过将数据分散到不同的服务器上,可以提高数据的可靠...
      99+
      2023-07-28
      文件 分布式 numpy
    • Java GUI流式布局管理器FlowLayout怎么用
      本文小编为大家详细介绍“Java GUI流式布局管理器FlowLayout怎么用”,内容详细,步骤清晰,细节处理妥当,希望这篇“Java GUI流式布局管理器FlowLayout怎么用”文章能帮助大家解决疑惑,下面跟着小...
      99+
      2023-06-30
    • java流式处理怎么实现
      在Java中,可以使用流式处理来处理数据。流式处理是一种连续的数据处理方式,数据可以从一个数据源流经一系列操作,最终得到处理后的结果...
      99+
      2023-10-10
      java
    • ClickHouse怎么处理分布式查询
      在ClickHouse中,分布式查询可以通过使用Distributed表来实现。Distributed表允许将查询分发到多个节点上并...
      99+
      2024-04-02
    • ZooKeeper中怎么处理分布式锁
      在ZooKeeper中处理分布式锁通常使用临时有序节点来实现。具体步骤如下: 在ZooKeeper的指定节点下创建一个顺序临时节点...
      99+
      2024-04-02
    • java怎么使用redis实现分布式锁
      在Java中使用Redis实现分布式锁可以通过以下步骤:1. 引入Redis相关的依赖,例如Jedis或Lettuce。2. 创建一...
      99+
      2023-10-09
      java redis
    • 如何使用 Python 轻松处理分布式日志文件?
      随着互联网技术的发展和应用的广泛,日志文件已经成为了企业和组织中重要的数据来源。在分布式系统中,日志文件的数量会变得非常庞大,如何高效地处理这些日志数据是一个非常值得关注的问题。本文将介绍如何使用 Python 轻松处理分布式日志文件。 一...
      99+
      2023-09-07
      文件 分布式 日志
    • Java 数组如何在分布式缓存中使用?
      随着互联网的快速发展,数据量的不断增加,分布式缓存成为了解决高并发、高性能、高可用等问题的一种重要方式。在分布式缓存中,Java 数组是非常常见的数据结构之一。本文将介绍 Java 数组在分布式缓存中的使用方法。 一、Java 数组简介 ...
      99+
      2023-06-14
      数组 分布式 缓存
    • MongoDB的分布式事务怎么处理
      MongoDB支持分布式事务处理的功能,通过使用分布式事务,可以确保多个操作在各个节点上的一致性。 在MongoDB中,分布式事务是...
      99+
      2024-05-07
      MongoDB
    • 分布式自然语言处理:Java 和 Numpy 的最佳组合?
      在当今信息爆炸的时代,自然语言处理(NLP)已成为热门话题。然而,处理大规模数据集和复杂的语言模型需要强大的计算能力和高效的分布式处理技术。Java 和 Numpy 是两个非常流行的编程语言,它们都有自己的优点和适用场景。本文将介绍如何使...
      99+
      2023-11-12
      分布式 numpy 自然语言处理
    • 怎么理解Java分布式与集群
      这篇文章主要介绍“怎么理解Java分布式与集群”,在日常操作中,相信很多人在怎么理解Java分布式与集群问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”怎么理解Java分布式与...
      99+
      2024-04-02
    • Java中怎么使用Redis实现分布式锁
      这篇“Java中怎么使用Redis实现分布式锁”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇...
      99+
      2023-05-25
      java redis
    • 分布式文件处理怎么做?Python 框架来帮忙!
      在当今互联网时代,数据量越来越大,处理数据的效率成为了一个非常重要的问题。分布式文件处理是其中一个解决方案,它可以将大量的数据分散到不同的节点上处理,从而提高处理效率。而 Python 框架也为我们提供了很多方便的工具来实现分布式文件处理...
      99+
      2023-10-14
      分布式 框架 文件
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作