-
파티션 재할당(Reassignment) 로직 찍먹하기개발자 라이프/카프카 2022. 10. 24. 23:38반응형
들어가며
이번엔 KafkaController 클래스에서 onPartitionReassignment 메서드 주석을 통해 파티션 재할당하는 로직을 알아봅니다. 대상 버전은 현재 가장 최신 버전인 3.3.0 버전입니다.
본문 중 Reassignment은 파티션 재할당 작업과 동일한 의미입니다.
파티션 재할당 로직 찍먹하기
onPartitionReassignment 메서드가 실행되는 시점
- AlterPartitionReassignments API 가 호출되었을 때,
- Reassignment 정보를 저장하는 주키퍼 znode 경로에 znode가 생성되었을 때,
- 진행 중이던 reassignment 작업이 종료되었을 때, (파티션의 ISR znode 변경으로 감지됨)
- 진행 중이던 reassignment 작업의 대상 브로커가 새롭게 실행되었을 때,
- 컨트롤러가 시작하거나 fail-over 되었을 때,
파티션 재할당 로직에서 사용되는 단어
파티션 재할당 작업 로직을 설명하는 과정에서 사용되는 축약어에 대한 정의입니다.
- RS : 현재의 레플리카(replica) 집합(set)
- ORS : 파티션의 원래(original) 레플리카 집합
- TRS : 파티션 재할당 작업의 목표(target) 레플리카 집합
- AR : 이번 파티션 재할당 작업에서 추가(add)되는 레플리카들
- RR : 이번 파티션 재할당 작업에서 제거(remove)되는 레플리카들
파티션 재할당의 3단계
파티션 재할당은 크게 3단계로 구성됩니다.
레플리카 할당 정보 갱신 단계 (Phase U; Assignment Update)
해당 메서드가 어떤 상황에서 실행되었든 상관없이, 파티션 재할당 작업의 첫 단계는 기존 레플리카 할당 상태를 업데이트하는 것입니다. 카프카 컨트롤러는 메모리에 할당 정보를 업데이트하기 전에 주키퍼에 할당 정보를 업데이트하므로, 이 상황은 컨트롤러의 fail-over 상황에서 재 실행될 수 있습니다.
- 주키퍼 znode에 다음과 같이 업데이트합니다.
- RS = ORS + TRS
- AR = TRS - ORS
- RR = ORS - TRS
- 컨트롤러 메모리에 위 정보를 동일하게 업데이트합니다.
- 만약 진행 중이던 재할당 작업이 취소되거나 변경되면, 더 이상 AR이 아닌 브로커들에게 StopReplica 요청을 보냅니다.
재할당 작업이 완료되기 위해선, 새로운 레플리카들이 ISR에 포함되어야 합니다. 그래서 ISR 상태에 의존하게 되는데, 아래 2가지 단계 중 하나가 실행됩니다.
재할당 작업이 아직 끝나지 않았을 때, (Phase A; TRS != ISR)
- 리더의 기수(epoch)를 올리고, RS에 LeaderAndIsr 요청을 보냅니다.
- AR이 NewReplica 상태가 되고, 복제를 시작합니다.
재할당 작업이 끝났을 때, (Phase B; TRS = ISR)
- AR의 모든 레플리카들이 OnlineReplica 상태가 됩니다.
- 메모리의 레플리카 집합 정보를 다음과 같이 변경합니다.
- RS = TRS
- AR = []
- RR = []
- RS = TRS 정보를 기반으로 LeaderAndIsr 요청을 보냅니다.
- 이 작업은 TRS - ORS에 해당하는 레플리카들이 다시 ISR에 포함되지 않도록 하기 위함입니다. 그리고 만약 현재 레플리카의 리더가 TRS에 포함되지 않거나 혹은 살아있지 않다면, TRS에서 새롭게 리더를 선출합니다.
- RR의 모든 레플리카들이 OfflineReplica 상태가 됩니다.
- OfflineReplica 상태로 변경됨에 따라, RR은 주키퍼의 ISR 정보에서 제거되고 또 리더 레플리카에게만 LeaderAndIsr 이 전달되어 ISR에서 제거되도록 합니다. 이후 RR의 레플리카들에게 StopReplica(delete=false) 요청이 전달됩니다.
- RR의 모든 레플리카들이 ReplicaDeletionStarted 상태가 되면, RR의 레플리카들에게 StopReplica(delete=true) 요청이 보내집니다. 이는 RR의 레플리카들이 디스크에서 물리적으로 삭제되도록 합니다.
- 주키퍼 znode에 메모리 저장되어있던 레플리카 집합 정보를 반영합니다.
- 이 작업이 가장 마지막에 있는 이유는 주키퍼가 ORS를 영속적으로 저장하는 유일한 곳이기 때문입니다. 이 구성을 통해 컨트롤러가 재할당 중간에 실패하더라도 복구가 여전히 가능합니다.
- ISR 재할당 리스너를 제거하고, 재할당 관련 znode 경로에서 이 파티션을 제거하도록 합니다.
- 리더 선출 후, 레플리카와 ISR 정보를 변경하고, 모든 브로커에 메타데이터 업데이트를 요청합니다.
예시 상황
ORS = {1, 2, 3}, TRS = {4, 5, 6} 일 때, 각 단계 별 레플리카 집합, Leader, ISR 상태를 나타냅니다.
순번 RS AR RR Leader ISR 단계 1 {1, 2, 3} {} {} 1 {1, 2, 3} 초기 상태 2 {4, 5, 6, 1, 2, 3} {4, 5, 6} {1, 2, 3} 1 {1, 2, 3} 복제 시작 (A 단계의 2번째) 3 {4, 5, 6, 1, 2, 3} {4, 5, 6} {1, 2, 3} 1 {1, 2, 3, 4, 5, 6} 복제가 완료되어 재할당 작업이 끝났을 때 (B 단계) 4 {4, 5, 6, 1, 2, 3} {4, 5, 6} {1, 2, 3} 4 {1, 2, 3, 4, 5, 6} TRS 정보를 기반으로 LeaderAndIsr 요청 (B 단계의 3번째) 5 {4, 5, 6, 1, 2, 3} {4, 5, 6} {1, 2, 3} 4 {4, 5, 6} RR 레플리카들의 복제 중지 (B 단계의 4번째) 6 {4, 5, 6} {} {} 4 {4, 5, 6} 레플리카 상태 정보 변경 (B 단계의 6번째) 직접 실행하고 로그로 봐보기
더보기Successfully updated assignment of partition daehokimm-topic-0 to ReplicaAssignment(replicas=4,5,6,1,2,3, addingReplicas=4,5,6, removingReplicas=1,2,3) (kafka.controller.KafkaController) // 리더의 기수(epoch)를 올리고 RS에 LeaderAndIsr 요청을 보냅니다. (A 단계의 첫번째) Updated leader epoch for partition daehokimm-topic-0 to 6, zkVersion=8 (kafka.controller.KafkaController) Sending LeaderAndIsr request to broker 1 with 1 become-leader and 0 become-follower partitions (state.change.logger) Sending LeaderAndIsr request to broker 2 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending LeaderAndIsr request to broker 3 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending LeaderAndIsr request to broker 4 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending LeaderAndIsr request to broker 5 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending LeaderAndIsr request to broker 6 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) Sent LeaderAndIsr request (Leader:1,ISR:1,2,3,LeaderRecoveryState:RECOVERED,LeaderEpoch:6,ZkVersion:8,ControllerEpoch:1) with new replica assignment ReplicaAssignment(replicas=4,5,6,1,2,3, addingReplicas=4,5,6, removingReplicas=1,2,3) to leader 1 for partition being reassigned daehokimm-topic-0 (state.change.logger) Sending LeaderAndIsr request to broker 4 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) Sending LeaderAndIsr request to broker 5 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) Sending LeaderAndIsr request to broker 6 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) // 4, 5, 6 레플리카들이 복제가 완료되어 OnlineReplica 상태로 변경됩니다. (B 단계 첫번째) Target replicas ArrayBuffer(4, 5, 6) have all caught up with the leader for reassigning partition daehokimm-topic-0 (kafka.controller.KafkaController) Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger) // TRS에 리더가 없으므로 TRS에서 리더를 선출합니다. (B 단계 3번째) Leader 1 for partition daehokimm-topic-0 being reassigned, is not in the new list of replicas 4,5,6. Re-electing leader (kafka.controller.KafkaController) // 리더가 변경되고 leader epoch 6에서 7로 증가합니다. Changed partition daehokimm-topic-0 from OnlinePartition to OnlinePartition with state LeaderAndIsr(leader=4, leaderEpoch=7, isr=List(5, 1, 6, 2, 3, 4), leaderRecoveryState=RECOVERED, zkVersion=12) (state.change.logger) // 재선출된 리더 정보를 바탕으로 LeaderAndIsr 요청을 보냅니다. Sending LeaderAndIsr request to broker 4 with 1 become-leader and 0 become-follower partitions (state.change.logger) Sending LeaderAndIsr request to broker 5 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending LeaderAndIsr request to broker 6 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) // RR의 레플리카들이 ISR에서 제외됩니다. Partition daehokimm-topic-0 state changed to (Leader:4,ISR:5,6,2,3,4,LeaderRecoveryState:RECOVERED,LeaderEpoch:8,ZkVersion:13,ControllerEpoch:1) after removing replica 1 from the ISR as part of transition to OfflineReplica (state.change.logger) Partition daehokimm-topic-0 state changed to (Leader:4,ISR:5,6,3,4,LeaderRecoveryState:RECOVERED,LeaderEpoch:9,ZkVersion:14,ControllerEpoch:1) after removing replica 2 from the ISR as part of transition to OfflineReplica (state.change.logger) Partition daehokimm-topic-0 state changed to (Leader:4,ISR:5,6,4,LeaderRecoveryState:RECOVERED,LeaderEpoch:10,ZkVersion:15,ControllerEpoch:1) after removing replica 3 from the ISR as part of transition to OfflineReplica (state.change.logger) Sending LeaderAndIsr request to broker 4 with 1 become-leader and 0 become-follower partitions (state.change.logger) Sending LeaderAndIsr request to broker 5 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending LeaderAndIsr request to broker 6 with 0 become-leader and 1 become-follower partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) // RR의 레플리카들이 OfflineReplica 상태가 되고, StopReplica(delete=false) 요청이 전달됩니다. (B 단계 4번째) Sending StopReplica request for 1 replicas to broker 1 (state.change.logger) Sending StopReplica request for 1 replicas to broker 2 (state.change.logger) Sending StopReplica request for 1 replicas to broker 3 (state.change.logger) Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger) // RR의 모든 레플리카들이 ReplicaDeletionStarted 상태가 되고, StopRelica(delete=true) 요청이 전달됩니다. (B 단계 5번째) Sending StopReplica request for 1 replicas to broker 1 (state.change.logger) Sending StopReplica request for 1 replicas to broker 2 (state.change.logger) Sending StopReplica request for 1 replicas to broker 3 (state.change.logger) Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger) Sending UpdateMetadata request to brokers HashSet() for 0 partitions (state.change.logger) // 컨트롤러 메모리의 레플리카 집합 정보에 완료된 정보가 반영됩니다. Successfully updated assignment of partition daehokimm-topic-0 to ReplicaAssignment(replicas=4,5,6, addingReplicas=, removingReplicas=) (kafka.controller.KafkaController) // 주키퍼에도 반영됩니다. (B 단계 6번째) Removing partitions Map(daehokimm-topic-0 -> Buffer(4, 5, 6)) from the list of reassigned partitions in zookeeper (kafka.controller.KafkaController) // 주키퍼에 파티션 재할당 관련 znode를 제거합니다. (B 단계 7번째) No more partitions need to be reassigned. Deleting zk path /admin/reassign_partitions (kafka.controller.KafkaController) Sending UpdateMetadata request to brokers HashSet(1, 2, 3, 4, 5, 6) for 1 partitions (state.change.logger) Processing automatic preferred replica leader election (kafka.controller.KafkaController)결론
이번 글을 통해 파티션 재할당 과정의 대략적인 로직을 살펴봤습니다. 레플리카들이 트래픽을 받고 있는 과정에서 안정적으로 파티션을 조정하기 위해 많은 부분이 고려되었음을 알 수 있었습니다.
참고
GitHub - apache/kafka: Mirror of Apache Kafka
Mirror of Apache Kafka. Contribute to apache/kafka development by creating an account on GitHub.
github.com
반응형