ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka 101] 카프카 컨슈머 (Kafka Consumer)
    개발자 라이프/카프카 2020. 4. 12. 22:28
    반응형

    들어가며

     카프카의 메시지 파이프라인은 프로듀서에서 시작해서 브로커를 통해 컨슈머로 끝나게 됩니다. 이번 글은 카프카 메시지 파이프라인의 마지막 부분인 컨슈머에 관해 전반적으로 설명합니다.

    1. 카프카 컨슈머

     컨슈머는 컨슈머 API와 그것으로 구성된 애플리케이션을 말합니다. 일반적으로 컨슈머가 토픽을 구독(Subscribe) 혹은 읽는다(Read)고 하는데, 이는 컨슈머가 토픽 파티션에 저장된 메시지들을 가져오는 것을 말합니다. 카프카 컨슈머의 다음 3가지 특징을 통해 더욱 효율적이고 유연한 메시지 구독 기능을 제공합니다. 

    • Polling 구조
    • 단일 토픽의 멀티 컨슈밍
    • 컨슈머 그룹

    Polling 구조

     일반적으로 다른 메시징 큐는 메시지 큐에서 메시지를 Push 합니다. 카프카 구성 요소로 예를 들면 브로커가 컨슈머로 메시지를 보내는 방식입니다. 하지만 이런 Push 방식의 가장 큰 단점은 메시지 큐가 컨슈머 측의 처리 성능을 염두해야 합니다. 즉, 메시지 큐가 컨슈머로 메시지를 Push 할 때, "컨슈머가 이 정도는 처리할 수 있겠지!" 하고 컨슈머 환경을 고려해야 합니다.

     이와 반대로 카프카는 컨슈머가 브로커로부터 메시지를 요청하는 Polling 구조로 설계되었습니다. 즉, 컨슈머는 자신이 원하는 만큼의 브로커로 메시지를 요청합니다. 이러한 구조의 가장 큰 장점은 각 컨슈머가 자신의 환경에 메시지 구독 성능을 최적화할 수 있다는 것입니다. 추가로 브로커는 컨슈머가 요청하는 것만큼 메시지를 전달해주면 되기 때문에 더 이상 컨슈머의 환경을 고려할 필요가 없습니다. 객체지향 SOLID 원칙 중 단일 책임 원칙 맥락과 유사하죠?

    단일 토픽의 멀티 컨슈밍

    동일한 토픽을 서로 다른 컨슈머 애플리케이션이 구독할 수 있습니다. (출처 : https://www.linkedin.com/pulse/kafka-consumer-overview-sylvester-daniel)

     카프카 컨슈밍의 또 다른 특징 중 하나는 하나의 토픽에 서로 다른 컨슈머 애플리케이션이 동시에 구독할 수 있다는 것입니다. 위 그림에서처럼 하나의 토픽(Topic A)을 컨슈머 App 1과 App 2이 동시에 구독할 수 있습니다. 이렇게 단일 토픽에 대한 멀티 컨슈밍이 가능한 이유는 컨슈머가 메시지를 읽을 때 브로커의 메시지가 삭제되는 것이 아니기 때문입니다. 대신에 각 컨슈머가 어느 토픽 파티션의 어느 오프셋까지 읽어갔는 지를 컨슈머 오프셋(`__consumer_offset`)이라는 토픽에 저장합니다. 

     컨슈머 오프셋으로 얻어지는 장점은 멀티 컨슈밍만이 아닙니다. 컨슈머 애플리케이션이 메시지 구독 중 중단되었다가 다시 구동되는 경우, 컨슈머 오프셋에 저장된 정보를 통해 자신이 어디서부터 메시지를 읽어야하는 지 알 수 있습니다. 즉, 컨슈머 상태와 관계 없이 안정적인 메시지 구독이 가능해집니다.

    컨슈머 그룹

     브로커는 성능을 위해 하나의 토픽을 여러 파티션으로 병렬 구성하여 처리합니다. 하지만 둘 이상의 파티션을 하나의 컨슈머로만 처리한다면 성능 상의 문제가 발생할 수 있습니다. 그래서 카프카 컨슈머는 하나 이상의 컨슈머가 컨슈머 그룹(Consumer Group)을 구성하여 하나의 토픽을 구독할 수 있습니다.

    2개의 컨슈머가 그룹을 형성하여 토픽 A를 구독하는 모습 (출처 : https://www.linkedin.com/pulse/kafka-consumer-overview-sylvester-daniel)

     컨슈머 그룹 내의 컨슈머는 토픽 파티션의 소유권(혹은 구독권)을 나눠 갖습니다. 예를 들면, 위 그림은 파티션 3개로 이루어진 토픽 A를 2개의 컨슈머가 컨슈머 그룹을 구성하여 구독하는 모습입니다. 컨슈머 0은 파티션 0의 소유권을 가지고 구독합니다. 마찬가지로 컨슈머 1은 파티션 1과 2의 소유권을 가지고 구독합니다. 이처럼 같은 컨슈머 그룹의 컨슈머들은 소유권을 가진 파티션만 구독합니다.

     그렇다면 컨슈머 그룹에 컨슈머가 추가되거나, 이탈하게 되면 어떻게 될까요? 이럴 때는 컨슈머 그룹 내부에서 파티션의 소유권이 재조정됩니다. 이러한 파티션 소유권 재조정을 리밸런싱(Rebalancing)이라고 합니다. 리밸런싱은 컨슈머 그룹 내에 특정 컨슈머의 상태가 변경되더라도 다른 컨슈머들이 유연하고 안정적으로 토픽을 구독할 수 있도록 도와주는 기능입니다. 리밸런싱 과정은 다소 깊이가 있는 내용이기 때문에 별도의 글에서 다루도록 하겠습니다.

    2. 컨슈머 예제 코드

     컨슈머 API를 활용한 예제 코드는 다음과 같습니다.(자바 8, 카프카 v2.4.1 기준)

    public class SampleConsumer {
    	public static void main(String[] args) {
    		// 프로퍼티 설정 부분
    		Properties props = new Properties();
    		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-group");
    		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
    		// 컨슈머 초기화 부분
    		Consumer<String, String> consumer = new KafkaConsumer<>(props);
    		consumer.subscribe(Collections.singleton("sample-topic"));
    
    		// 컨슈밍 부분
    		while (true) {
    			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2_000));
    			records.forEach(record -> System.out.println(record.value()));
    		}
    	}
    }

     컨슈머 생성에 필수 설정은 다음 3가지입니다.

    • bootstrap.servers : 메타데이터 초기화를 위한 설정
    • group.id : 컨슈머의 그룹 아이디
    • key.deserializer.class : 메시지의 키를 역직렬화하기 위한 설정
    • value.deserializer.class : 메시지의 값을 역직렬화하기 위한 설정

     예제 코드의 컨슈머는 로컬(localhost:9092) 카프카 브로커를 통해 메타데이터를 초기화하고, sample-group이라는 컨슈머 그룹에 속합니다. 또한 구독하는 메시지의 키와 값은 StringDeserializer를 통해 역직렬화합니다(프로퍼티 설정 부분).

     설정 값을 바탕으로 생성된 프로퍼티 객체를 이용하여 컨슈머 객체를 생성하고, 컨슈머 객체의 subscribe 메소드를 이용하여 sample-topic 토픽을 구독하도록 합니다(컨슈머 초기화 부분). 그다음 반복문과 poll 메소드를 이용하여 메시지를 무한히 구독하고, 컨슈머로 읽어드린 메시지의 오프셋과 값을 표준 출력합니다(컨슈밍 부분). 정상적으로 메시지를 구독하면 다음과 같이 출력합니다.

    콘솔 프로듀서를 이용한 메시지 발행

    9 : hello
    10 : kafka
    11 : consumer
    12 : world!

     

    3. 컨슈머 주요 설정

     컨슈머는 앞서 살펴본 4가지 설정 외에도 다양한 설정이 있습니다. 이런 설정들은 요구되는 컨슈밍 환경이나 성능 등에 따라 다양하게 설정될 수 있습니다. 그 중 가장 자주 고려되는 몇 가지 설정을 살펴봅니다.

    heartbeat.interval.ms & session.timeout.ms

     위 2개의 설정은 컨슈머의 상태를 나타내는 것과 관련된 설정입니다. 하트비트 설정(heartbeat.interval.ms)는 컨슈머가 자신이 정상 구동 중임을 특정 브로커(컨슈머 코디네이터)에 알려주는 주기이고, 세션 타임아웃 설정(session.timeout.ms) 설정은 브로커가 컨슈머의 하트비트 신호를 기다리는 주기입니다. 만약 설정된 세션 타임아웃 시간 중에 특정 컨슈머의 하트비트 신호를 받지 못하면, 브로커는 해당 컨슈머가 정상 구동이 아님으로 판단하고 리밸런싱을 시작합니다. 이처럼 두 가지 설정은 서로 연관되어 있기 때문에 하트비트 설정과 세션 타임아웃 설정을 약 1 : 3 비율로 지정합니다. 예를 들어, 하트비트 설정이 1초로 되어있다면 세션 타임아웃 설정을 3초로 설정합니다.

    하트비트는 별도의 스레드를 통해 전송됩니다. (출처 : https://chrzaszcz.dev/2019/06/kafka-heartbeat-thread/)

    max.partition.fetch.bytes

     이 설정은 파티션 별로 컨슈머가 한번에 가져오는 메시지의 최대 크기를 지정하는 설정입니다. 따라서 컨슈머는 {구독하는 파티션 수 * max.partition.fetch.bytes}의 값만큼 메모리를 가지고 있어야 합니다. 그리고 이 설정은 컨슈머가 poll 메소드를 통해 생성하는 ConsumerRecords의 최대 크기와 관련 있습니다.

     나아가 이 설정은 브로커의 max.message.bytes 설정과 매우 깊은 관련이 있습니다. 만약 브로커의 max.message.bytes 설정의 값이 max.partition.fetch.bytes 보다 크고, 그 설정 값의 크기를 가지는 메시지가 브로커로 발행되어 있다면 컨슈머는 그 메시지를 구독할 수 없게 됩니다.

    auto.offset.reset

     컨슈머는 파티션의 오프셋을 기준으로 메시지를 구독합니다. 하지만 컨슈머가 처음 컨슈머 그룹을 구성하여 구동되거나, 컨슈머가 다소 오랜 기간동안 중단되었다가 구동되어 오프셋에 대응되는 메시지가 브로커에서 삭제된 경우 즉, 컨슈머가 오프셋 정보를 가지고 있지 않은 경우에는 해당 정보를 초기화해야 합니다. 이 때, 초기화는 auto.offst.reset 설정에 따라 다음 3가지 방법으로 할 수 있습니다.

    • 파티션에 저장된 메시지의 가장 처음부터 : earliest
    • 구독 후 파티션에 처음 들어오는 메시지부터 : latest
    • 초기화하지 않고 에러 발생 : none

     

     

     

    enable.auto.commit

     컨슈머는 메시지를 브로커로부터 가져와 처리한 뒤, 처리된 완료한 메시지의 오프셋 정보를 컨슈머 오프셋(__consumer_offset)으로 커밋(commit)합니다. 이를 오프셋 커밋이라 말하는데, 이 오프셋 커밋을 자동으로 할 지할지 혹은 수동으로 할 지에 관한 설정이 enable.auto.commit 설정입니다. 자동 커밋(=true)인 경우, auto.commit.interval.ms 설정 주기에 따라 자동으로 오프셋을 커밋합니다. 반대로 수동 커밋(=false) 일 경우, 컨슈머 객체의 commitAsync() 혹은 commitSync()를 통해 직접 커밋을 해야 합니다. 대부분은 수동 커밋을 사용하여 메시지 처리가 완료되었음을 직접 관리합니다.

    커밋은 메시지 중복 혹은 유실과 매우 관련이 깊으므로 직접 관리하는 것이 중요합니다. (출처 : https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch04.html)

    마무리

     이번 글을 통해 카프카 메시지 파이프라인의 마지막 요소인 컨슈머에 대해 살펴봤습니다. 혹여나 잘못된 부분이나 궁금한 것이 있다면 언제든지 댓글로 남겨주시길 바랍니다.

    ps. 이번 글이 마음에 들었다면 공감 버튼❤️을 눌러주세요 :)

    반응형

    댓글

Designed by Tistory.