返回顶部
首页 > 资讯 > 精选 >怎样简谈Kafka中的NIO网络通信模型
  • 840
分享到

怎样简谈Kafka中的NIO网络通信模型

2023-06-02 21:06:24 840人浏览 八月长安
摘要

这篇文章将为大家详细讲解有关怎样简谈kafka中的NIO网络通信模型,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网

这篇文章将为大家详细讲解有关怎样简谈kafka中的NIO网络通信模型,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网络通信层还是比较相似的,小编就为大家简要地介绍下Kafka的Nio网络通信模型

下面主要通过对Kafka源码的分析来简述其Reactor的多线程网络通信模型和总体框架结构,同时简要介绍Kafka网络通信层的设计与具体实现。

一、Kafka网络通信模型的整体框架概述

Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的。这里先引用Kafka源码中注释的一段话:

An NIO Socket server. The threading model is 1 Acceptor thread that handles new connections. Acceptor has N Processor threads that each have their own selector and read requests from sockets. M Handler threads that handle requests and produce responses back to the processor threads for writing.

相信大家看了上面的这段引文注释后,大致可以了解到Kafka的网络通信层模型,主要采用了 1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程) 。下面的表格简要的列举了下(这里先简单的看下后面还会详细说明):

线程数线程名线程具体说明1kafka-socket-acceptor_%xAcceptor线程,负责监听Client端发起的请求Nkafka-network-thread_%dProcessor线程,负责对Socket进行读写Mkafka-request-handler-_%dWorker线程,处理具体的业务逻辑并生成Response返回

Kafka网络通信层的完整框架图如下图所示:

怎样简谈Kafka中的NIO网络通信模型

Kafka消息队列的通信层模型—1+N+M模型.png

刚开始看到上面的这个框架图可能会有一些不太理解,并不要紧,这里可以先对Kafka的网络通信层框架结构有一个大致了解。本文后面会结合Kafka的部分重要源码来详细阐述上面的过程。这里可以简单总结一下其网络通信模型中的几个重要概念:

(1), Acceptor :1个接收线程,负责监听新的连接请求,同时注册OP_ACCEPT 事件,将新的连接按照 "round robin" 方式交给对应的 Processor 线程处理;

(2), Processor :N个处理器线程,其中每个 Processor 都有自己的 selector,它会向 Acceptor 分配的 SocketChannel 注册相应的 OP_READ 事件,N 的大小由 “num.networker.threads” 决定;

(3), KafkaRequestHandler :M个请求处理线程,包含在线程池—KafkaRequestHandlerPool内部,从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给Kafkaapis处理,M的大小由 “num.io.threads” 决定;

(4), RequestChannel :其为Kafka服务端的请求通道,该数据结构中包含了一个全局的请求队列 requestQueue和多个与Processor处理器相对应的响应队列responseQueue,提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方。

(5), NetworkClient :其底层是对 Java NIO 进行相应的封装,位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送;

(6), SocketServer :其是一个NIO的服务,它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式,将接收客户端请求和处理请求相分离;

(7), KafkaServer :代表了一个Kafka Broker的实例;其startup方法为实例启动的入口;

(8), KafkaApis :Kafka的业务逻辑处理Api,负责处理不同类型的请求;比如 “发送消息”、 “获取消息偏移量—offset” 和 “处理心跳请求” 等;

二、Kafka网络通信层的设计与具体实现

这一节将结合Kafka网络通信层的源码来分析其设计与实现,这里主要详细介绍网络通信层的几个重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源码部分均基于Kafka的0.11.0版本。

SocketServer

SocketServer是接收客户端Socket请求连接、处理请求并返回处理结果的核心类,Acceptor及Processor的初始化、处理逻辑都是在这里实现的。在KafkaServer实例启动时会调用其startup的初始化方法,会初始化1个 Acceptor和N个Processor线程(每个EndPoint都会初始化,一般来说一个Server只会设置一个端口),其实现如下:

 
def startup() { this.synchronized { connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides) val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId var processorBeginIndex = 0 // 一个broker一般只设置一个端口 config.listeners.foreach { endpoint => val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val processorEndIndex = processorBeginIndex + numProcessorThreads //N 个 processor for (i <- processorBeginIndex until processorEndIndex) processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool) //1个 Acceptor val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas) acceptors.put(endpoint, acceptor) KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start() acceptor.awaitStartup() processorBeginIndex = processorEndIndex } }

Acceptor

Acceptor是一个继承自抽象类AbstractServerThread的线程类。Acceptor的主要任务是监听并且接收客户端的请求,同时建立数据传输通道—SocketChannel,然后以轮询的方式交给一个后端的Processor线程处理(具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理)。

在该线程类中主要可以关注以下两个重要的变量:

(1), niOSelector :通过NSelector.open()方法创建的变量,封装了JAVA NIO Selector的相关操作;

(2), serverChannel :用于监听端口的服务端Socket套接字对象;

下面来看下Acceptor主要的run方法的源码:

 
def run() { //首先注册OP_ACCEPT事件 serverChannel.reGISter(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessor = 0 //以轮询方式查询并等待关注的事件发生 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) //如果事件发生则调用accept方法对OP_ACCEPT事件处理 accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") //轮询算法 // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } } //代码省略 } def accept(key: SelectionKey, processor: Processor) { val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel] val socketChannel = serverSocketChannel.accept() try { connectionQuotas.inc(socketChannel.socket().getInetAddress) socketChannel.configureBlocking(false) socketChannel.socket().settcpnodelay(true) socketChannel.socket().seTKEepAlive(true) if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) socketChannel.socket().setSendBufferSize(sendBufferSize) processor.accept(socketChannel) } catch { //省略部分代码 } } def accept(socketChannel: SocketChannel) { newConnections.add(socketChannel) wakeup() }

在上面源码中可以看到,Acceptor线程启动后,首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OP_ACCEPT 事件。然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用accept()方法对OP_ACCEPT事件进行处理。这里,Processor是通过 round robin 方法选择的,这样可以保证后面多个Processor线程的负载基本均匀。

Acceptor的accept()方法的作用主要如下:

(1)通过SelectionKey取得与之对应的serverSocketChannel实例,并调用它的accept()方法与客户端建立连接;

(2)调用connectionQuotas.inc()方法增加连接统计计数;并同时设置第(1)步中创建返回的socketChannel属性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)

(3)将socketChannel交给processor.accept()方法进行处理。这里主要是将socketChannel加入Processor处理器的并发队列newConnections队列中,然后唤醒Processor线程从队列中获取socketChannel并处理。其中,newConnections会被Acceptor线程和Processor线程并发访问操作,所以newConnections是ConcurrentLinkedQueue队列(一个基于链接节点的无界线程安全队列)

Processor

Processor同Acceptor一样,也是一个线程类,继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。在该线程类中主要关注以下几个重要的变量:

(1), newConnections :在上面的 Acceptor 一节中已经提到过,它是一种ConcurrentLinkedQueue[SocketChannel]类型的队列,用于保存新连接交由Processor处理的socketChannel;

(2), inflightResponses :是一个Map[String, RequestChannel.Response]类型的集合,用于记录尚未发送的响应;

(3), selector :是一个类型为KSelector变量,用于管理网络连接;

下面先给出Processor处理器线程run方法执行的流程图:

怎样简谈Kafka中的NIO网络通信模型

Kafk_Processor线程的处理流程图.png

从上面的流程图中能够可以看出Processor处理器线程在其主流程中主要完成了这样子几步操作:

(1), 处理newConnections队列中的socketChannel 。遍历取出队列中的每个socketChannel并将其在selector上注册OP_READ事件;

(2), 处理RequestChannel中与当前Processor对应响应队列中的Response 。在这一步中会根据responseAction的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OP_WRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的;

(3), 调用selector.poll()方法进行处理 。该方法底层即为调用nioSelector.select()方法进行处理。

(4), 处理已接受完成的数据包队列—completedReceives 。在processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的OP_READ事件;

(5), 处理已发送完的队列—completedSends 。当已经完成将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OP_READ事件;

(6), 处理断开连接的队列 。将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1;

RequestChannel

在Kafka的网络通信层中,RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,是通信过程中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中;KafkaRequestHandler线程从请求队列中获取并处理,处理完以后将Response添加至RequestChannel的响应队列—responseQueue中,并通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送至客户端。

KafkaRequestHandler

KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干个KafkaRequestHandler线程),这些线程以守护线程的方式在后台运行。在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,读取后再交由KafkaApis来具体处理。

KafkaApis

KafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。该组件反映出Kafka Broker Server可以提供哪些服务。

关于怎样简谈Kafka中的NIO网络通信模型就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

--结束END--

本文标题: 怎样简谈Kafka中的NIO网络通信模型

本文链接: https://lsjlt.com/news/231377.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

猜你喜欢
  • 怎样简谈Kafka中的NIO网络通信模型
    这篇文章将为大家详细讲解有关怎样简谈Kafka中的NIO网络通信模型,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。摘要:很多人喜欢把RocketMQ与Kafka做对比,其实这两款消息队列的网...
    99+
    2023-06-02
  • Python制作简易聊天器,搭建UDP网络通信模型
    目录1.导入模块2.创建一个套接字对象3.发送数据到ubuntu系统中4.发送任意数据给网络条数助手5.循环发送数据6.循环接受数据循环接收将相关的功能抽离出来做成一个函数7.启动最...
    99+
    2024-04-02
  • Python如何制作简易聊天器以及搭建UDP网络通信模型
    这篇文章将为大家详细讲解有关Python如何制作简易聊天器以及搭建UDP网络通信模型,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。前言:互联网的本质是什么?其实就是信息的交换。就比如我们常用...
    99+
    2023-06-22
  • Linux网络I/O+Reactor模型是怎么样的
    本篇文章给大家分享的是有关Linux网络I/O+Reactor模型是怎么样的,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。前言网络I/O,可以理解为网络上的数据流。通常我们会基...
    99+
    2023-06-15
  • Keras中怎么定义一个简单的神经网络模型
    在Keras中,你可以通过Sequential模型来定义一个简单的神经网络模型。以下是一个简单的例子: from keras.mod...
    99+
    2024-04-02
  • Python怎样实现LeNet网络模型的训练及预测
    本篇文章给大家分享的是有关Python怎样实现LeNet网络模型的训练及预测,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。1.LeNet模型训练脚本整体的训练代码如下,下面我会...
    99+
    2023-06-21
  • 如何理解docker中的网络模式和跨主机通信
    今天就跟大家聊聊有关如何理解docker中的网络模式和跨主机通信,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。Docker的四种网络模式Bridge模式当Docker进程启动时,会在...
    99+
    2023-06-05
  • 如何在TensorFlow中训练一个简单的神经网络模型
    在TensorFlow中训练一个简单的神经网络模型通常需要以下步骤: 数据准备:准备好训练数据和测试数据,并对数据进行预处理和标...
    99+
    2024-03-01
    TensorFlow
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作