-
[Kafka] kafka api - LeaderAndIsr 에 관하여개발자 라이프/카프카 2021. 4. 23. 17:22반응형
들어가며
카프카 클러스터의 브로커들 중에는 파티션 레플리카들의 리더와 팔로워 상태를 관리하는 컨트롤러 브로커가 있습니다. 컨트롤러 브로커는 브로커들의 partition reassign 등으로 리더-팔로워 상태가 변경된 경우, 변경된 상태를 클러스터의 브로커들에게 전달하여 적용할 수 있도록 합니다. 이번 글은 컨트롤러 브로커가 다른 브로커에 레플리카 정보를 전달하는 과정에 대해 알아봅니다.
LeaderAndIsr
카프카는 브로커 간 통신에 관한 protocol과 api 스펙을 정의해서 사용합니다. 그중 파티션의 리더-팔로워 역할 정보와 ISR(In Sync Replicas) 정보를 통신하기 위해 LeaderAndIsr 스펙(api key)를 사용합니다.
LeaderAndIsr Request
LeaderAndIsr Request (Version: 0) => controller_id controller_epoch [ungrouped_partition_states] [live_leaders] controller_id => INT32 controller_epoch => INT32 ungrouped_partition_states => topic_name partition_index controller_epoch leader leader_epoch [isr] zk_version [replicas] topic_name => STRING partition_index => INT32 controller_epoch => INT32 leader => INT32 leader_epoch => INT32 isr => INT32 zk_version => INT32 replicas => INT32 live_leaders => broker_id host_name port broker_id => INT32 host_name => STRING port => INT32
컨트롤러가 브로커에 전달하는 LeaderAndIsr 요청에는 위와 같은 메시지가 구성됩니다. 메시지 내용에서 확인할 수 있듯이, 컨트롤러 브로커는 토픽 파티션에 대하여 변경이 발생한 리더 브로커에 대한 정보(leader, leader_epoch)와 ISR 정보를 브로커에 전달합니다. 위 스펙은 초기 버전으로, 이후 요청받은 브로커가 변경된 상태 정보를 빠르게 반영할 수 있도록 최적화하여 버전 업 되었습니다. 아래는 작성일 기준 가장 최근 버전의 LeaderAndIsr Request 메시지 스펙입니다(* 표시된 필드가 추가된 필드 입니다).
LeaderAndIsr Request (Version: 5) => controller_id controller_epoch broker_epoch type [topic_states] [live_leaders] TAG_BUFFER controller_id => INT32 controller_epoch => INT32 * broker_epoch => INT64 * type => INT8 * topic_states => topic_name topic_id [partition_states] TAG_BUFFER * topic_name => COMPACT_STRING * topic_id => UUID partition_states => partition_index controller_epoch leader leader_epoch [isr] zk_version [replicas] [adding_replicas] [removing_replicas] is_new TAG_BUFFER partition_index => INT32 controller_epoch => INT32 leader => INT32 leader_epoch => INT32 isr => INT32 zk_version => INT32 replicas => INT32 * adding_replicas => INT32 * removing_replicas => INT32 * is_new => BOOLEAN live_leaders => broker_id host_name port TAG_BUFFER broker_id => INT32 host_name => COMPACT_STRING port => INT32
LeaderAndIsr Response
LeaderAndIsr Response (Version: 0) => error_code [partition_errors] error_code => INT16 partition_errors => topic_name partition_index error_code topic_name => STRING partition_index => INT32 error_code => INT16
컨트롤러의 레플리카 변경 정보를 받은 브로커는 일련의 작업 이후 위와 같은 응답을 컨트롤러 브로커에 전달합니다. 메시지 내용에서 확인할 수 있듯이, 변경 과정에서 발생한 에러 코드와 에러가 발생한 파티션 정보를 전달합니다. 에러 코드에 관한 정보는 링크를 통해서 확인할 수 있습니다.
Request 스펙과 마찬가지로 Response 스펙도 버전 업되고 있으며, 작성일 기준 아래와 같은 스펙을 가지고 있습니다.
LeaderAndIsr Response (Version: 5) => error_code [topics] TAG_BUFFER error_code => INT16 topics => topic_id [partition_errors] TAG_BUFFER * topic_id => UUID partition_errors => partition_index error_code TAG_BUFFER partition_index => INT32 error_code => INT16
LeaderAndIsr api 요청-응답 과정
먼저, 컨트롤러가 변경된 파티션 정보(리더-팔로워, ISR)를 ControllerChannelManager의 sendRequestsToBrokers 메서드를 이용하여 브로커에 요청합니다. 관련된 코드는 아래와 같습니다.
// ControllerChannelManager.scala def sendRequestsToBrokers(controllerEpoch: Int) { try { val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerEpoch) // API 버전 설정 val leaderAndIsrRequestVersion: Short = if (controller.config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1 else 0 // state-change.log에 요청 정보 로깅 leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) => leaderAndIsrPartitionStates.foreach { case (topicPartition, state) => val typeOfRequest = if (broker == state.basePartitionState.leader) "become-leader" else "become-follower" stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition") } // leader 정보 추출 val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map { _.node(controller.config.interBrokerListenerName) } // ** LeadAndIsrReqeust 생성 및 브로커에 LEADER_AND_ISR 요청 전달 ** val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId, controllerEpoch, leaderAndIsrPartitionStates.asJava, leaders.asJava) controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequestBuilder, (r: AbstractResponse) => controller.eventManager.put(controller.LeaderAndIsrResponseReceived(r, broker))) } leaderAndIsrRequestMap.clear() // .. 이 후 UPDATE_METADATA api 호출 // .. 이 후 STOP_REPLICA api 호출 } catch { // LeaderAndIsr 요청에 대한 예외 처리 case e: Throwable => if (leaderAndIsrRequestMap.nonEmpty) { error("Haven't been able to send leader and isr requests, current state of " + s"the map is $leaderAndIsrRequestMap. Exception message: $e") } // .. 이 후 UPDATE_METADATA api 예외 처리 // .. 이 후 STOP_REPLICA api 예외 처리 throw new IllegalStateException(e) } }
브로커에 대한 모든 요청은 kafkaApis에서 api key 별로 처리합니다. LeaderAndIsr은 아래와 같이 handleLeaderAndIsrRequest 메서드로 처리합니다.
// KafkaApis.scala def handle(request: RequestChannel.Request) { try { trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" + s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}") request.header.apiKey match { // ... case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request) // ... } catch { case e: FatalExitError => throw e case e: Throwable => handleError(request, e) } finally { request.apiLocalCompleteTimeNanos = time.nanoseconds } }
그리고 handleLeaderAndIsrReqeust에서 ReplicaManager의 becomeLeaderOrFollower 메서드를 호출하여 파티션 별 리더-팔로워 상태 변경을 진행합니다.
// ReplicaManager.scala def becomeLeaderOrFollower(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest, onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = { // ... // 리더와 팔로워 분리 후 val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) => stateInfo.basePartitionState.leader == localBrokerId } val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys // 리더는 리더로 변경 val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap) else Set.empty[Partition] // 팔로워는 팔로워로 변경 val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap) else Set.empty[Partition] // ... }
그리고 실제 상태는 makeLeaders와 makeFollowers 메소드 내에서 개별적인 Partition 객체 별로 Partition#makeLeader와 Partition#makeFollower를 호출하는 것으로 변경됩니다.
// Partition.scala /** * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset * from the time when this broker was the leader last time) and setting the new leader and ISR. * If the leader replica id does not change, return false to indicate the replica manager. */ def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { } /** * Make the local replica the follower by setting the new leader and ISR to empty * If the leader replica id does not change and the new epoch is equal or one * greater (that is, no updates have been missed), return false to indicate to the * replica manager that state is already correct and the become-follower steps can be skipped */ def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = { }
파티션의 상태 변경이 완료되면 토픽 파티션 별로 응답을 바탕으로 LeaderAndIsrResponse 객체를 생성하여 컨트롤러에 응답을 전달합니다.
// ReplicaManager.scala def becomeLeaderOrFollower(correlationId: Int, leaderAndIsrRequest: LeaderAndIsrRequest, onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = { // ... val responseMap = new mutable.HashMap[TopicPartition, Errors] // ... leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) => val partition = getOrCreatePartition(topicPartition) val partitionLeaderEpoch = partition.getLeaderEpoch if (partition eq ReplicaManager.OfflinePartition) { stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " + s"controller $controllerId with correlation id $correlationId " + s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " + "partition is in an offline log directory") // 파티션 별로 에러 응답을 저장 - 파티션이 오프라인인 경우 responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR) } else if (partitionLeaderEpoch < stateInfo.basePartitionState.leaderEpoch) { // ... } } // .. makeLeaders & makeFollowers new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava) } } }
// KafkaApis.scala def handleLeaderAndIsrRequest(request: RequestChannel.Request) { if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { val response = replicaManager.becomeLeaderOrFollower(correlationId, LeaderAndIsrRequest, onLeadershipChange) // 응답을 전달한다. sendResponseExemptThrottle(request, response) } else { sendResponseMaybeThrottle(request, throttleTimeMs => leaderAndIsrRequest.getErrorResponse(throttleTimeMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) } }
LeaderAndIsr api와 관련되어 발생할 수 있는 이슈
Partition Leader와 관련된 Split brain 현상
컨트롤러에서 전달한 LeaderAndIsr을 바탕으로 브로커들은 개별적인 파티션 별로 리더-팔로워 역할을 변경하게 됩니다. 그러나 특정 브로커에서 네트워크 지연 등으로 LeaderAndIsr 요청에 관한 처리가 늦어지면 해당 파티션에 관한 상태가 서로 다른 시점의 상태로 유지하게 됩니다.
위 그림은 AAA 토픽의 0번 파티션의 리더가 1번 브로커에서 2번 브로커로 reassign 되었을 때 split brain 현상이 발생한 상황입니다. 0, 1번 브로커는 reassign 된 파티션 정보를 정상적으로 처리했지만, 2번 브로커는 정상 처리되지 않고 이전 상태 값을 가집니다. 이때, AAA 토픽의 0번 파티션의 리더가 이전 상태 값인 1번 브로커와 최근 상태 값인 2번 브로커로, 비정상적인 상태가 됩니다. 그렇기 때문에 클라이언트는 2번 브로커가 정상적으로 처리될 때까지 AAA 토픽의 0번 파티션으로 pub/sub 할 수 없게 됩니다.
마무리
이번 글을 통해 LeaderAndIsr api에 대해 살펴봤습니다.
반응형