ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] kafka api - LeaderAndIsr 에 관하여
    개발자 라이프/카프카 2021. 4. 23. 17:22
    반응형

    들어가며

     카프카 클러스터의 브로커들 중에는 파티션 레플리카들의 리더와 팔로워 상태를 관리하는 컨트롤러 브로커가 있습니다. 컨트롤러 브로커는 브로커들의 partition reassign 등으로 리더-팔로워 상태가 변경된 경우, 변경된 상태를 클러스터의 브로커들에게 전달하여 적용할 수 있도록 합니다. 이번 글은 컨트롤러 브로커가 다른 브로커에 레플리카 정보를 전달하는 과정에 대해 알아봅니다.

    LeaderAndIsr

     카프카는 브로커 간 통신에 관한 protocol과 api 스펙을 정의해서 사용합니다. 그중 파티션의 리더-팔로워 역할 정보와 ISR(In Sync Replicas) 정보를 통신하기 위해 LeaderAndIsr 스펙(api key)를 사용합니다.

    LeaderAndIsr Request

    LeaderAndIsr Request (Version: 0) => controller_id controller_epoch [ungrouped_partition_states] [live_leaders] 
      controller_id => INT32
      controller_epoch => INT32
      ungrouped_partition_states => topic_name partition_index controller_epoch leader leader_epoch [isr] zk_version [replicas] 
        topic_name => STRING
        partition_index => INT32
        controller_epoch => INT32
        leader => INT32
        leader_epoch => INT32
        isr => INT32
        zk_version => INT32
        replicas => INT32
      live_leaders => broker_id host_name port 
        broker_id => INT32
        host_name => STRING
        port => INT32

     컨트롤러가 브로커에 전달하는 LeaderAndIsr 요청에는 위와 같은 메시지가 구성됩니다. 메시지 내용에서 확인할 수 있듯이, 컨트롤러 브로커는 토픽 파티션에 대하여 변경이 발생한 리더 브로커에 대한 정보(leader, leader_epoch)와 ISR 정보를 브로커에 전달합니다. 위 스펙은 초기 버전으로, 이후 요청받은 브로커가 변경된 상태 정보를 빠르게 반영할 수 있도록 최적화하여 버전 업 되었습니다. 아래는 작성일 기준 가장 최근 버전의 LeaderAndIsr Request 메시지 스펙입니다(* 표시된 필드가 추가된 필드 입니다).

    LeaderAndIsr Request (Version: 5) => controller_id controller_epoch broker_epoch type [topic_states] [live_leaders] TAG_BUFFER 
       controller_id => INT32
       controller_epoch => INT32
    *  broker_epoch => INT64
    *  type => INT8
    *  topic_states => topic_name topic_id [partition_states] TAG_BUFFER 
    *   topic_name => COMPACT_STRING
    *   topic_id => UUID
        partition_states => partition_index controller_epoch leader leader_epoch [isr] zk_version [replicas] [adding_replicas] [removing_replicas] is_new TAG_BUFFER 
           partition_index => INT32
           controller_epoch => INT32
           leader => INT32
           leader_epoch => INT32
           isr => INT32
           zk_version => INT32
           replicas => INT32
    *      adding_replicas => INT32
    *      removing_replicas => INT32
    *      is_new => BOOLEAN
       live_leaders => broker_id host_name port TAG_BUFFER 
         broker_id => INT32
         host_name => COMPACT_STRING
         port => INT32

    LeaderAndIsr Response 

    LeaderAndIsr Response (Version: 0) => error_code [partition_errors] 
      error_code => INT16
      partition_errors => topic_name partition_index error_code 
        topic_name => STRING
        partition_index => INT32
        error_code => INT16

     컨트롤러의 레플리카 변경 정보를 받은 브로커는 일련의 작업 이후 위와 같은 응답을 컨트롤러 브로커에 전달합니다. 메시지 내용에서 확인할 수 있듯이, 변경 과정에서 발생한 에러 코드와 에러가 발생한 파티션 정보를 전달합니다. 에러 코드에 관한 정보는 링크를 통해서 확인할 수 있습니다.

     Request 스펙과 마찬가지로 Response 스펙도 버전 업되고 있으며, 작성일 기준 아래와 같은 스펙을 가지고 있습니다. 

    LeaderAndIsr Response (Version: 5) => error_code [topics] TAG_BUFFER 
       error_code => INT16
       topics => topic_id [partition_errors] TAG_BUFFER 
    *    topic_id => UUID
         partition_errors => partition_index error_code TAG_BUFFER 
           partition_index => INT32
           error_code => INT16

    LeaderAndIsr api 요청-응답 과정

     먼저, 컨트롤러가 변경된 파티션 정보(리더-팔로워, ISR)를 ControllerChannelManager의 sendRequestsToBrokers 메서드를 이용하여 브로커에 요청합니다. 관련된 코드는 아래와 같습니다. 

    // ControllerChannelManager.scala
    def sendRequestsToBrokers(controllerEpoch: Int) {
      try {
        val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerEpoch)
    // API 버전 설정
        val leaderAndIsrRequestVersion: Short =
          if (controller.config.interBrokerProtocolVersion >= KAFKA_1_0_IV0) 1
          else 0
    // state-change.log에 요청 정보 로깅
        leaderAndIsrRequestMap.foreach { case (broker, leaderAndIsrPartitionStates) =>
          leaderAndIsrPartitionStates.foreach { case (topicPartition, state) =>
            val typeOfRequest =
              if (broker == state.basePartitionState.leader) "become-leader"
              else "become-follower"
            stateChangeLog.trace(s"Sending $typeOfRequest LeaderAndIsr request $state to broker $broker for partition $topicPartition")
          }
    // leader 정보 추출      
          val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet  
          val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
            _.node(controller.config.interBrokerListenerName)
          }
    // ** LeadAndIsrReqeust 생성 및 브로커에 LEADER_AND_ISR 요청 전달 **
          val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
            controllerEpoch, leaderAndIsrPartitionStates.asJava, leaders.asJava)
          controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, leaderAndIsrRequestBuilder,
            (r: AbstractResponse) => controller.eventManager.put(controller.LeaderAndIsrResponseReceived(r, broker)))
        }
        leaderAndIsrRequestMap.clear()
    
    // .. 이 후 UPDATE_METADATA api 호출
    // .. 이 후 STOP_REPLICA api 호출
    
      } catch {
    // LeaderAndIsr 요청에 대한 예외 처리
        case e: Throwable =>
          if (leaderAndIsrRequestMap.nonEmpty) {
            error("Haven't been able to send leader and isr requests, current state of " +
                s"the map is $leaderAndIsrRequestMap. Exception message: $e")
          }
    
    // .. 이 후 UPDATE_METADATA api 예외 처리
    // .. 이 후 STOP_REPLICA api 예외 처리
    
          throw new IllegalStateException(e)
      }
    }

     브로커에 대한 모든 요청은 kafkaApis에서 api key 별로 처리합니다. LeaderAndIsr은 아래와 같이 handleLeaderAndIsrRequest 메서드로 처리합니다.

    // KafkaApis.scala
      def handle(request: RequestChannel.Request) {
        try {
          trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
            s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
          request.header.apiKey match {
    // ...
            case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
    // ...
        } catch {
          case e: FatalExitError => throw e
          case e: Throwable => handleError(request, e)
        } finally {
          request.apiLocalCompleteTimeNanos = time.nanoseconds
        }
      }

     그리고 handleLeaderAndIsrReqeust에서 ReplicaManager의 becomeLeaderOrFollower 메서드를 호출하여 파티션 별 리더-팔로워 상태 변경을 진행합니다.

    // ReplicaManager.scala
      def becomeLeaderOrFollower(correlationId: Int,
                                 leaderAndIsrRequest: LeaderAndIsrRequest,
                                 onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
    // ...
    // 리더와 팔로워 분리 후
            val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
              stateInfo.basePartitionState.leader == localBrokerId
            }
            val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
    // 리더는 리더로 변경
            val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
              makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
            else
              Set.empty[Partition]
    // 팔로워는 팔로워로 변경
            val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
              makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap)
            else
              Set.empty[Partition]
    // ...
      }

    그리고 실제 상태는 makeLeaders와 makeFollowers 메소드 내에서 개별적인 Partition 객체 별로 Partition#makeLeader와 Partition#makeFollower를 호출하는 것으로 변경됩니다.

    // Partition.scala
      /**
       * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset
       * from the time when this broker was the leader last time) and setting the new leader and ISR.
       * If the leader replica id does not change, return false to indicate the replica manager.
       */
      def makeLeader(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
      }
    
      /**
       *  Make the local replica the follower by setting the new leader and ISR to empty
       *  If the leader replica id does not change and the new epoch is equal or one
       *  greater (that is, no updates have been missed), return false to indicate to the
        * replica manager that state is already correct and the become-follower steps can be skipped
       */
      def makeFollower(controllerId: Int, partitionStateInfo: LeaderAndIsrRequest.PartitionState, correlationId: Int): Boolean = {
      }
    

     파티션의 상태 변경이 완료되면 토픽 파티션 별로 응답을 바탕으로 LeaderAndIsrResponse 객체를 생성하여 컨트롤러에 응답을 전달합니다.

    // ReplicaManager.scala
      def becomeLeaderOrFollower(correlationId: Int,
                                 leaderAndIsrRequest: LeaderAndIsrRequest,
                                 onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): LeaderAndIsrResponse = {
    // ...
            val responseMap = new mutable.HashMap[TopicPartition, Errors]
    // ...
            leaderAndIsrRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
              val partition = getOrCreatePartition(topicPartition)
              val partitionLeaderEpoch = partition.getLeaderEpoch
              if (partition eq ReplicaManager.OfflinePartition) {
                stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from " +
                  s"controller $controllerId with correlation id $correlationId " +
                  s"epoch $controllerEpoch for partition $topicPartition as the local replica for the " +
                  "partition is in an offline log directory")
    // 파티션 별로 에러 응답을 저장 - 파티션이 오프라인인 경우
                responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
              } else if (partitionLeaderEpoch < stateInfo.basePartitionState.leaderEpoch) {
    // ...
              }
            }
    // .. makeLeaders & makeFollowers
            new LeaderAndIsrResponse(Errors.NONE, responseMap.asJava)
          }
        }
      }
    // KafkaApis.scala
      def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
        if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
          val response = replicaManager.becomeLeaderOrFollower(correlationId, LeaderAndIsrRequest, onLeadershipChange)
    // 응답을 전달한다.
          sendResponseExemptThrottle(request, response)
        } else {
          sendResponseMaybeThrottle(request, throttleTimeMs => leaderAndIsrRequest.getErrorResponse(throttleTimeMs,
            Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
        }
      }

    LeaderAndIsr api와 관련되어 발생할 수 있는 이슈

    Partition Leader와 관련된 Split brain 현상

     컨트롤러에서 전달한 LeaderAndIsr을 바탕으로 브로커들은 개별적인 파티션 별로 리더-팔로워 역할을 변경하게 됩니다. 그러나 특정 브로커에서 네트워크 지연 등으로 LeaderAndIsr 요청에 관한 처리가 늦어지면 해당 파티션에 관한 상태가 서로 다른 시점의 상태로 유지하게 됩니다.

    각 브로커가 가지고 있는 리더에 관한 상태가 서로 달라 순간 리더가 2개가 된다.

     위 그림은 AAA 토픽의 0번 파티션의 리더가 1번 브로커에서 2번 브로커로 reassign 되었을 때 split brain 현상이 발생한 상황입니다. 0, 1번 브로커는 reassign 된 파티션 정보를 정상적으로 처리했지만, 2번 브로커는 정상 처리되지 않고 이전 상태 값을 가집니다. 이때, AAA 토픽의 0번 파티션의 리더가 이전 상태 값인 1번 브로커와 최근 상태 값인 2번 브로커로, 비정상적인 상태가 됩니다. 그렇기 때문에 클라이언트는 2번 브로커가 정상적으로 처리될 때까지 AAA 토픽의 0번 파티션으로 pub/sub 할 수 없게 됩니다.

    마무리

     이번 글을 통해 LeaderAndIsr api에 대해 살펴봤습니다. 

    반응형

    댓글

Designed by Tistory.