
  • 파티션 재할당(Reassignment) 로직 찍먹하기
    이번엔 KafkaController 클래스에서 onPartitionReassignment 메서드 주석을 통해 파티션 재할당하는 로직을 알아봅니다. 대상 버전은 현재 가장 최신 버전인 3.3.0 버전입니다. 

    본문 중 Reassignment은 파티션 재할당 작업과 동일한 의미입니다.

    파티션 재할당 로직 찍먹하기

    onPartitionReassignment 메서드가 실행되는 시점

    • AlterPartitionReassignments API 가 호출되었을 때,
    • Reassignment 정보를 저장하는 주키퍼 znode 경로에 znode가 생성되었을 때,
    • 진행 중이던 reassignment 작업이 종료되었을 때, (파티션의 ISR znode 변경으로 감지됨)
    • 진행 중이던 reassignment 작업의 대상 브로커가 새롭게 실행되었을 때,
    • 컨트롤러가 시작하거나 fail-over 되었을 때,

    파티션 재할당 로직에서 사용되는 단어

    파티션 재할당 작업 로직을 설명하는 과정에서 사용되는 축약어에 대한 정의입니다.

    • RS : 현재의 레플리카(replica) 집합(set)
    • ORS : 파티션의 원래(original) 레플리카 집합
    • TRS : 파티션 재할당 작업의 목표(target) 레플리카 집합
    • AR : 이번 파티션 재할당 작업에서 추가(add)되는 레플리카들
    • RR : 이번 파티션 재할당 작업에서 제거(remove)되는 레플리카들 

    파티션 재할당의 3단계

    파티션 재할당은 크게 3단계로 구성됩니다.

    레플리카 할당 정보 갱신 단계 (Phase U; Assignment Update)

    해당 메서드가 어떤 상황에서 실행되었든 상관없이, 파티션 재할당 작업의 첫 단계는 기존 레플리카 할당 상태를 업데이트하는 것입니다. 카프카 컨트롤러는 메모리에 할당 정보를 업데이트하기 전에 주키퍼에 할당 정보를 업데이트하므로, 이 상황은 컨트롤러의 fail-over 상황에서 재 실행될 수 있습니다. 

    1. 주키퍼 znode에 다음과 같이 업데이트합니다.
      1. RS = ORS + TRS
      2. AR = TRS - ORS
      3. RR = ORS - TRS
    2. 컨트롤러 메모리에 위 정보를 동일하게 업데이트합니다.
    3. 만약 진행 중이던 재할당 작업이 취소되거나 변경되면, 더 이상 AR이 아닌 브로커들에게 StopReplica 요청을 보냅니다.

    재할당 작업이 완료되기 위해선, 새로운 레플리카들이 ISR에 포함되어야 합니다. 그래서 ISR 상태에 의존하게 되는데, 아래 2가지 단계 중 하나가 실행됩니다.

    재할당 작업이 아직 끝나지 않았을 때, (Phase A; TRS != ISR)

    1. 리더의 기수(epoch)를 올리고, RS에 LeaderAndIsr 요청을 보냅니다.
    2. AR이 NewReplica 상태가 되고, 복제를 시작합니다.

    재할당 작업이 끝났을 때, (Phase B; TRS = ISR)

    1. AR의 모든 레플리카들이 OnlineReplica 상태가 됩니다.
    2. 메모리의 레플리카 집합 정보를 다음과 같이 변경합니다.
      • RS = TRS
      • AR = []
      • RR = []
    3. RS = TRS 정보를 기반으로 LeaderAndIsr 요청을 보냅니다.
      • 이 작업은 TRS - ORS에 해당하는 레플리카들이 다시 ISR에 포함되지 않도록 하기 위함입니다. 그리고 만약 현재 레플리카의 리더가 TRS에 포함되지 않거나 혹은 살아있지 않다면, TRS에서 새롭게 리더를 선출합니다. 
    4. RR의 모든 레플리카들이 OfflineReplica 상태가 됩니다.
      • OfflineReplica 상태로 변경됨에 따라, RR은 주키퍼의 ISR 정보에서 제거되고 또 리더 레플리카에게만 LeaderAndIsr 이 전달되어 ISR에서 제거되도록 합니다. 이후 RR의 레플리카들에게 StopReplica(delete=false) 요청이 전달됩니다.
    5. RR의 모든 레플리카들이 ReplicaDeletionStarted 상태가 되면, RR의 레플리카들에게 StopReplica(delete=true) 요청이 보내집니다. 이는 RR의 레플리카들이 디스크에서 물리적으로 삭제되도록 합니다.
    6. 주키퍼 znode에 메모리 저장되어있던 레플리카 집합 정보를 반영합니다.
      • 이 작업이 가장 마지막에 있는 이유는 주키퍼가 ORS를 영속적으로 저장하는 유일한 곳이기 때문입니다. 이 구성을 통해 컨트롤러가 재할당 중간에 실패하더라도 복구가 여전히 가능합니다.
    7. ISR 재할당 리스너를 제거하고, 재할당 관련 znode 경로에서 이 파티션을 제거하도록 합니다.
    8. 리더 선출 후, 레플리카와 ISR 정보를 변경하고, 모든 브로커에 메타데이터 업데이트를 요청합니다.

    예시 상황

    ORS = {1, 2, 3}, TRS = {4, 5, 6} 일 때, 각 단계 별 레플리카 집합, Leader, ISR 상태를 나타냅니다.

    순번 RS AR RR Leader ISR 단계
    1 {1, 2, 3} {} {} 1 {1, 2, 3} 초기 상태
    2 {4, 5, 6, 1, 2, 3} {4, 5, 6} {1, 2, 3} 1 {1, 2, 3} 복제 시작 (A 단계의 2번째)
    3 {4, 5, 6, 1, 2, 3} {4, 5, 6} {1, 2, 3} 1 {1, 2, 3, 4, 5, 6} 복제가 완료되어 재할당 작업이 끝났을 때 (B 단계)
    4 {4, 5, 6, 1, 2, 3} {4, 5, 6} {1, 2, 3} 4 {1, 2, 3, 4, 5, 6} TRS 정보를 기반으로 LeaderAndIsr 요청 (B 단계의 3번째)
    5 {4, 5, 6, 1, 2, 3} {4, 5, 6} {1, 2, 3} 4 {4, 5, 6} RR 레플리카들의 복제 중지 (B 단계의 4번째)
    6 {4, 5, 6} {} {} 4 {4, 5, 6} 레플리카 상태 정보 변경 (B 단계의 6번째)

    직접 실행하고 로그로 봐보기

    Successfully updated assignment of partition daehokimm-topic-0 to ReplicaAssignment(replicas=4,5,6,1,2,3, addingReplicas=4,5,6, removingReplicas=1,2,3) (kafka.controller.KafkaController)
    // 리더의 기수(epoch)를 올리고 RS에 LeaderAndIsr 요청을 보냅니다. (A 단계의 첫번째)
    Updated leader epoch for partition daehokimm-topic-0 to 6, zkVersion=8 (kafka.controller.KafkaController)
    Sending LeaderAndIsr request to broker 1 with 1 become-leader and 0 become-follower partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 2 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 3 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 4 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 5 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 6 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    Sent LeaderAndIsr request (Leader:1,ISR:1,2,3,LeaderRecoveryState:RECOVERED,LeaderEpoch:6,ZkVersion:8,ControllerEpoch:1) with new replica assignment ReplicaAssignment(replicas=4,5,6,1,2,3, addingReplicas=4,5,6, removingReplicas=1,2,3) to leader 1 for partition being reassigned daehokimm-topic-0 (state.change.logger)
    Sending LeaderAndIsr request to broker 4 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 5 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 6 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    // 4, 5, 6 레플리카들이 복제가 완료되어 OnlineReplica 상태로 변경됩니다. (B 단계 첫번째)
    Target replicas ArrayBuffer(4, 5, 6) have all caught up with the leader for reassigning partition daehokimm-topic-0 (kafka.controller.KafkaController)
    Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
    // TRS에 리더가 없으므로 TRS에서 리더를 선출합니다. (B 단계 3번째)
    Leader 1 for partition daehokimm-topic-0 being reassigned, is not in the new list of replicas 4,5,6. Re-electing leader (kafka.controller.KafkaController)
    // 리더가 변경되고 leader epoch 6에서 7로 증가합니다.
    Changed partition daehokimm-topic-0 from OnlinePartition to OnlinePartition with state LeaderAndIsr(leader=4, leaderEpoch=7, isr=List(5, 1, 6, 2, 3, 4), leaderRecoveryState=RECOVERED, zkVersion=12) (state.change.logger)
    // 재선출된 리더 정보를 바탕으로 LeaderAndIsr 요청을 보냅니다.
    Sending LeaderAndIsr request to broker 4 with 1 become-leader and 0 become-follower partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 5 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 6 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    // RR의 레플리카들이 ISR에서 제외됩니다. 
    Partition daehokimm-topic-0 state changed to (Leader:4,ISR:5,6,2,3,4,LeaderRecoveryState:RECOVERED,LeaderEpoch:8,ZkVersion:13,ControllerEpoch:1) after removing replica 1 from the ISR as part of transition to OfflineReplica (state.change.logger)
    Partition daehokimm-topic-0 state changed to (Leader:4,ISR:5,6,3,4,LeaderRecoveryState:RECOVERED,LeaderEpoch:9,ZkVersion:14,ControllerEpoch:1) after removing replica 2 from the ISR as part of transition to OfflineReplica (state.change.logger)
    Partition daehokimm-topic-0 state changed to (Leader:4,ISR:5,6,4,LeaderRecoveryState:RECOVERED,LeaderEpoch:10,ZkVersion:15,ControllerEpoch:1) after removing replica 3 from the ISR as part of transition to OfflineReplica (state.change.logger)
    Sending LeaderAndIsr request to broker 4 with 1 become-leader and 0 become-follower partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 5 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending LeaderAndIsr request to broker 6 with 0 become-leader and 1 become-follower partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    // RR의 레플리카들이 OfflineReplica 상태가 되고, StopReplica(delete=false) 요청이 전달됩니다. (B 단계 4번째)
    Sending StopReplica request for 1 replicas to broker 1 (state.change.logger)
    Sending StopReplica request for 1 replicas to broker 2 (state.change.logger)
    Sending StopReplica request for 1 replicas to broker 3 (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
    // RR의 모든 레플리카들이 ReplicaDeletionStarted 상태가 되고, StopRelica(delete=true) 요청이 전달됩니다. (B 단계 5번째)
    Sending StopReplica request for 1 replicas to broker 1 (state.change.logger)
    Sending StopReplica request for 1 replicas to broker 2 (state.change.logger)
    Sending StopReplica request for 1 replicas to broker 3 (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
    Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger)
    // 컨트롤러 메모리의 레플리카 집합 정보에 완료된 정보가 반영됩니다.
    Successfully updated assignment of partition daehokimm-topic-0 to ReplicaAssignment(replicas=4,5,6, addingReplicas=, removingReplicas=) (kafka.controller.KafkaController)
    // 주키퍼에도 반영됩니다. (B 단계 6번째)
    Removing partitions Map(daehokimm-topic-0 -> Buffer(4, 5, 6)) from the list of reassigned partitions in zookeeper (kafka.controller.KafkaController)
    // 주키퍼에 파티션 재할당 관련 znode를 제거합니다. (B 단계 7번째)
    No more partitions need to be reassigned. Deleting zk path /admin/reassign_partitions (kafka.controller.KafkaController)
    Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger)
    Processing automatic preferred replica leader election (kafka.controller.KafkaController)


     이번 글을 통해 파티션 재할당 과정의 대략적인 로직을 살펴봤습니다. 레플리카들이 트래픽을 받고 있는 과정에서 안정적으로 파티션을 조정하기 위해 많은 부분이 고려되었음을 알 수 있었습니다.




