irpas技术客

Kafka请求处理模块(Network)_code shower_kafka模块

irpas 8298

Kafka网络模块 1. 前言

kafka高性能、高吞吐量的背后和broker端优秀的网络模块设计密不可分,整体而言kafka采用java原生的nio来处理来自producer,consumer以及其他broker的请求,本文就从kafka的启动函数main开始,剖析一下kafka network模块的架构。

2. 整体架构

kafka网络模块的架构图如下:

kafka network模块两个最重要的构成是Server Socket和KafkaRequestHandler

Server Socket

Server Socket实现了reactor线程模型,由一个Adaptor线程来不断的轮训注册在ServerSocketChannel上的就绪连接事件(OP_ACCEPT),而一个Adaptor线程有对应processors线程池,Adaptor线程轮训出的就绪事件,会按照一定的规则从线程池里选择出一个processor线程去处理,而processor线程负责与client建立connection,并将返回的客户端socket channel加入到Request Queue队列。

也就是说,Server Socket并没有对请求做真实的业务处理,它的作用仅仅是轮训事件然后建立连接,最后作为生产者把socket channel扔到了Request Queue。

KafkaRequestHandlerPool

与Server Socket相反,KafkaRequestHandlerPool则是真实对请求做业务处理的模块。KafkaRequestHandlerPool里有多个KafkaRequestHandler,而KafkaRequestHandler会从Request Queue里消费数据,然后为消费的数据分配一个io线程,去处理。

3. 组件 3.1. Acceptor

? acceptor是一个单独的线程,它的职责是用nio的selector不断的轮训服务端channel上就绪连接(OP_ACCEPT)事件,并为这些就绪的连接事件分配一个processor去处理。

? acceptor的主要核心内容在run方法:

def run(): Unit = { serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete() try { var currentProcessorIndex = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext && isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) { accept(key).foreach { socketChannel => var retriesLeft = synchronized(processors.length) var processor: Processor = null do { retriesLeft -= 1 processor = synchronized { currentProcessorIndex = currentProcessorIndex % processors.length processors(currentProcessorIndex) } currentProcessorIndex += 1 } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0)) } } } catch { //... } } } } }

要看懂这段代码,需要具备一定的nio知识。

首先第一段:

serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) startupComplete()

serverChannel是acceptor创建的一个服务端channel,封装了一个服务端socket,nioSelector是一个nio的事件选择器,它会对注册在它上面的channel不断的轮训出想要的类型的就绪事件,因此这行代码的含义就是将服务端channel注册到nioSelector,注册的类型是OP_ACCEPT,只关心连接事件。

startupComplete则是标志一下启动成功了,修改启动和关闭两个CountDownLatch。

重点来了,下面这个while循环,就是Acceptor轮训的关键:

isRunning这个变量是Broker是否还在运行的标志,除非Broker挂了,否则一直是true。

随后:

val ready = nioSelector.select(500)

这个nio的一次阻塞式的select,意思就是nioSelector持续轮训500ms,并返回就绪事件的数量。

如果就绪事件数量大于0,则查询出就绪事件(SelectedKey)集合,并且用一个while循环去遍历,对于每个SelectedKey的处理:

创建连接

val socketChannel = serverSocketChannel.accept()

分配processor

var processor: Processor = null do { retriesLeft -= 1 processor = synchronized { currentProcessorIndex = currentProcessorIndex % processors.length processors(currentProcessorIndex) } currentProcessorIndex += 1 } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))

对processors数组长度取模,取出一个processor,并在assignNewConnection方法中判断这个processor是否可用,一直循环直到找到可用的processor,并将selectedKey所对应的客户端channel作为参数传入,assignNewConnection是这么做的:

def accept(socketChannel: SocketChannel, mayBlock: Boolean, acceptorIdlePercentMeter: com.yammer.metrics.core.Meter): Boolean = { val accepted = { if (newConnections.offer(socketChannel)) true else if (mayBlock) { val startNs = time.nanoseconds newConnections.put(socketChannel) acceptorIdlePercentMeter.mark(time.nanoseconds() - startNs) true } else false } if (accepted) wakeup() accepted }

首先尝试将刚才传入的socketChannel加到阻塞队列newConnections,如果队列已满则根据mayBlock决定是否阻塞一段时间再插入。总而言之:

assignNewConnection这个方法就是将就绪事件SelectKey对应的客户端channel(socket channel)加入到newConnections这个阻塞队列以供后续流程使用。

acceptor这个组件做的事情很简单,就是找出就绪事件,然后为每个事件分配一个processor线程处理,并将socket channel加入到。 newConnections这个阻塞队列。

3.2 Processor

processor是处理请求的线程,这个请求可能来自acceptor轮训出的request,也可能来自处理完事件的response,processor有三个重要的数据结构:

private val newConnections = new ArrayBlockingQueue[SocketChannel](connectionQueueSize) private val inflightResponses = mutable.Map[String, RequestChannel.Response]() private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

newConnections上面已经提到了,responseQueue是每个processor独有的一个响应队列,KafkaRequestHandler处理完请求以后会将处理结果放到这个队列返回给客户端。而inflightResponses也是用来存response的,它存在的意义是,有些response事件返回给客户端以后,还需要处理一些broker的回调,因此将这些reponse临时存在inflightResponses里。

同样,下面来看processor线程的run方法:

override def run(): Unit = { startupComplete() try { while (isRunning) { try { configureNewConnections() processNewResponses() poll() processCompletedReceives() processCompletedSends() processDisconnected() closeExcessConnections() } catch { case e: Throwable => processException("Processor got uncaught exception.", e) } } } }

processor的run方法同样启动了一个死循环,只要kafka broker在运行就不会停止。循环里调用了七个方法,每个方法做单独的业务处理,代码层次非常清晰,这里我们只讨论处理请求的方法而不讨论处理响应的方法。

configureNewConnections

while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) { val channel = newConnections.poll() selector.register(connectionId(channel.socket), channel) connectionsProcessed += 1 }

configureNewConnections方法核心逻辑就是这几行代码,可以很清晰的看出,所做的工作就是先从newConnections请求队列里取出一个acceptor放进去的socket channel,然后将socket channel注册到一个selector。需要注意的是,这个selector是kafka对nio selector对的一层封装,而这个register方法:

public void register(String id, SocketChannel socketChannel) throws IOException { ensureNotRegistered(id); registerChannel(id, socketChannel, SelectionKey.OP_READ); this.sensors.connectionCreated.record(); }

再往下看这个registerChannel方法:

protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) { SelectionKey key = socketChannel.register(nioSelector, interestedOps); KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key); this.channels.put(id, channel); if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), time.nanoseconds()); return key; }

这个方法做了非常重要的处理。首先和之前acceptor将服务端channel注册到nioSelector上一样,这里将newConnections队列里面取出来的客户端socketChannel也注册到了nioSelector上,注册的事件类型是OP_READ。

此外通过buildAndAttachKafkaChannel方法,将这个socketChannel封装成了一个KafkaChannel,并将这个KafkaChannel添加到channels里。

private final Map<String, KafkaChannel> channels;

channels是Selector的一个变量,它是一个Map,key是根据socketChannel编号,value是封装的KafkaChannel,processor线程关闭的时候会遍历这个channels,将所有channel关闭。

poll

看过很多关于kafka network的博客,讲到这个poll方法的时候,一般都是这么介绍的:

poll方法是真正进行nio poll的地方

? 但是有真正把poll方法剖开层层分析的文章少之又少,因为poll方法特别复杂,所以看起来都很吃力,这里我把poll方法个人觉得几个 重要的地方分析一下:

selector poll出所有就绪事件

Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

这个nio的处理,将所有就绪事件封装成SelectionKey放到一个set里

processSelectionKeys

for (SelectionKey key : selectionKeys) { KafkaChannel channel = channel(key); addToStagedReceives(channel, receive); } Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;

这里省略了非常多的代码,process的核心就是将key封装成kafkaChannel,然后加入到stagedReceives中。

最后又stagedReceives中的channel加到completedReceives中

private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) { NetworkReceive networkReceive = stagedDeque.poll(); this.completedReceives.add(networkReceive); this.sensors.recordBytesReceived(channel.id(), networkReceive.size()); }

? 至于为什么不直接将channel加到completeReceive中,是因为中间有许多鉴权处理的过程,而这个NetworkReceive对象,则是包含 了请求具体的io信息。

processCompleteReceives

第二步已经将所有事件封装成NetworkReceive并添加到了completedReceives中,第三步则是将这些数据发送出去,通过processCompleteReceives方法,而这个方法的核心在于下面这几句:

val connectionId = receive.source val context = new RequestContext(header, connectionId, channel.socketAddress, channel.principal, listenerName, securityProtocol) val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics) requestChannel.sendRequest(req)

可以看出,就是将receive对象封装成一个Request对象,然后加入到SocketChannel的RequestChannel队列中,这个RequestChannel是所有processor线程公用的。

到此为止,processor的逻辑全部梳理完成了,总结一下就是:

处理acceptor添加到newConnections队列中的socketChannel,将socketChannel注册到nioSelector,并封装成KafkaChannel添加到channels这个Map中执行poll,poll出socketChannel中所有就绪事件,经过大量处理封装成NetworkReceive对象添加到CompletedReceives数组将CompletedReceives数组中的元素封装成Request对象,添加到SocketServer的RequestChannel中。 3.3 KafkaRequestHandler

? KafkaRequestHandler是有一个单独的组件,它和SocketServer的processor线程组构成了一个生产者消费者模型,而中间的存储介质则是SocketServet的RequestChannel队列。

? KafkaRequestHandler是实际对请求做io业务处理的线程,由于processor线程中已经对请求做了非常好的封装,所以KafkaRequestHandler的逻辑非常的简单,来看看它的run方法:

def run(): Unit = { while (!stopped) { val startSelectTime = time.nanoseconds val req = requestChannel.receiveRequest(300) val endTime = time.nanoseconds val idleTime = endTime - startSelectTime aggregateIdleMeter.mark(idleTime / totalHandlerThreads.get) req match { case RequestChannel.ShutdownRequest => shutdownComplete.countDown() return case request: RequestChannel.Request => try { request.requestDequeueTimeNanos = endTime apis.handle(request) } catch { case e: Throwable => error("Exception when handling request", e) } shutdownComplete.countDown() }

首先从RequestChannel中获取一个request,然后调用apis.handle处理这个request,而这个apis.handle,则称得上是Kafka最重要的一个源码入口了:

def handle(request: RequestChannel.Request): Unit = { try { request.header.apiKey match { case ApiKeys.PRODUCE => handleProduceRequest(request) case ApiKeys.FETCH => handleFetchRequest(request) case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request) case ApiKeys.METADATA => handleTopicMetadataRequest(request) case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request) case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request) case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request) case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request) case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request) case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request) case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request) case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request) case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request) case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request) case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request) case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request) case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request) case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request) case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request) case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request) case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request) case ApiKeys.END_TXN => handleEndTxnRequest(request) case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request) case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request) case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => handleCreateAcls(request) case ApiKeys.DELETE_ACLS => handleDeleteAcls(request) case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request) case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request) case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request) case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request) case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request) case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request) case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request) case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request) case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request) case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request) case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigsRequest(request) case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignmentsRequest(request) case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignmentsRequest(request) case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request) } } catch { case e: FatalExitError => throw e case e: Throwable => handleError(request, e) } finally { request.apiLocalCompleteTimeNanos = time.nanoseconds } }

第一次追踪源码追到这里的时候,感觉就像探寻宝藏,经过层层阻挠,终于看到了金碧辉煌的大门一样。所有Kafka的请求,都由这个入口进入处理,如果能掌握这个方法涉及到的所有方法,那可真算是Kafka大师了。

具体的io业务处理这里就不展开了,总而言之kafka的网络模块非常负责,但是逻辑链路也很清晰,生产者消费者的使用算是被kakfa运用的淋漓尽致,也可以看出,kakfa之所以有那么高的吞吐量,除去针对操作系统底层的优化不谈,它的架构设计也是重要原因之一。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #kafka模块 #Kafka网络模块1 #network模块的架构 #2