ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Spring Kafka] 컨슈밍과 관련된 구성 요소 살펴보기
    개발자 라이프/카프카 2021. 8. 1. 23:49
    반응형

    들어가며

     스프링 카프카 프로젝트는 스프링 프레임워크 내에서 카프카를 더욱 손쉽게 사용할 수 있도록 지원하는 프로젝트입니다. 그래서 기존 카프카의 프로듀서, 컨슈머, 스트림 어플리케이션 구성을 스프링 프레임워크 내에서 간단하게 정의하고, 구현할 수 있도록 합니다. 이번 글은 스프링 카프카에서 컨슈밍과 관련된 구성 요소들을 간단히 살펴봅니다.

    컨슈밍과 관련된 구성 요소 살펴보기

    MessageListenerContainer

     Spring Kafka에선 MessageListenerContainer를 설정하여 컨슈머를 구성합니다. 그리고 실제 동작은 MessageListener 구현체를 MessageListenerContainer 설정에 제공하거나, 처리와 관련된 함수에 @KafkaListener 어노테이션을 붙여 컨슈밍할 수 있습니다. 참고로 이번 글에선 MessageListener에 관련된 내용만 다룹니다.

     Spring Kafka에서는 현재 2개의 MessageListenerContainer 구현체를 제공합니다.

    • KafkaMessageListenerContainer
      • 단일 스레드로 동작하는 컨슈머
    • ConcurrentMessageListenerContainer
      • 내부적으로 하나 이상의 KafkaMessageListenerContainer로 구성되는 멀티 스레드 방식의 컨슈머

    ConcurrentMessageListenerContainer 내부에는 concurrency 속성 값에 따라 KafkaMessageListenerContainer의 리스트가 구성된다.

    MessageListener

    MessageListenerContainer를 사용하기 위해선 데이터를 가져와서 처리하는 로직을 담은 Listener(리스너) 구현체를 제공해야 합니다. 현재 리스너에 대한 인터페이스들을 제공하고 있는데, 이는 아래와 같은 속성들이 조합되어 구성되는 형태입니다.

    • 컨슘한 레코드에 대한 Commit를 수동적으로 처리할 수 있다. (Acknowledgment)
    • 컨슘한 컨슈머의 정보에 직접 접근할 수 있다. (Consumer Aware)
    • Poll() 을 통해 가져온 레코드를 한꺼번에 처리할 수 있다. (Batch)

     이러한 속성의 조합에 따라 생성되는 인터페이스는 `MessageListener` 부터 `BatchAcknowledgingConsumerAwareMessageListener` 까지 총 8개의 인터페이스를 제공합니다.

    수동 커밋, 컨슈머 정보 접근 가능, 배치 형식의 처리를 정의하는 인터페이스. onMessage() 메서드 부분.

     여기서 주의해야 할 점은 리스너를 통해 접근하는 컨슈머는 Thread safe 하지 않으므로, 속성을 변경하지 말고 단순한 메소드 호출로만 사용해야 한다는 것입니다.

    MessageListenerContainer 설정해보기

     MessageListenerContainer 구현체를 각각 설정해보며, 관련된 요소들이 어떤 것들이 있는지 알아봅니다.

    (1) KafkaMessageListenerContainer

     KafkaMessageListenerContainer(카프카 메시지 리스너 컨테이너) 는 아래와 같은 생성자를 제공하고 있습니다.

    가장 간단한 생성자. ConsumerFactory 와 ConatinerProperties 속성을 받는다.

     카프카 메시지 리스너 컨테이너를 생성하기 위해선 ConsumerFactoryContainerProperties 타입의 인스턴스들을 아규먼트로 전달 받습니다. ConsumerFactory는 컨슈머가 사용하는 Key와 Value에 대한 역직렬화 타입, 컨슈머 그룹의 Id, 클라이언트 Id 에 대한 접두사 혹은 접미사를 받아 컨슈머(org.apache.kafka.clients.consumer.Consumer) 객체를 생성하는 내용을 정의한 인터페이스입니다.

    ConsumerFactory 인터페이스의 createConsumer 메소드

     반면에 ContainerProperties는 오프셋 커밋 방식, Exactly Once Sementic 모드 등 컨슈밍 하는 동작에 대한 속성들을 설정하는 클래스입니다.

    ContainerProperties 클래스 내의 속성들. 컨슈밍하는 동작 사이사이에 대한 설정들이 보인다.

     또한 ContainerProperties 는 생성자에 따라 토픽에 대한 할당 방식이 달라집니다.

    • public ContainerProperties(TopicPartitionOffset... topicPartitions)
      • 특정 토픽 파티션의 오프셋을 기준으로 컨슈밍 한다.
      • assign 방식
    • public ContainerProperties(String... topics)
      • 전달된 토픽들을 컨슈밍 한다.
      • subscribe 방식
    • public ContainerProperties(Pattern topicPattern)
      • 패턴에 맞는 모든 토픽을 컨슈밍한다.
      • subscribe 방식

    그리고 ContainerProperties 클래스의 setMessageListener 메소드를 이용하여 앞서 살펴본 메시지 리스너를 설정할 수 있습니다.

    MessageListener 인터페이스를 구현한 클래스를 아규먼트로 받는다.

     결국, 카프카 메시지 리스너 컨테이너는 어떠한 컨슈머가(ConsumerFactory) 어떻게 컨슈밍할 지(ContainerProperties)에 대한 정보를 가지고 생성됩니다.

    @Bean
    fun getKafkaMessageListenerContainer(): KafkaMessageListenerContainer<String, String>  {
    
        /**
         * ConsumerFactory 는 컨슈머의 속성과 관련있다.
         * - 어떤 카프카 클러스터와 연동할 것인지
         * - 어떤 이름을 가지는 컨슈머인지
         * - 어떤 타입의 Key/Value 메시지를 컨슘할 것인지
         */
        val consumerConfigs = mapOf(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
            ConsumerConfig.GROUP_ID_CONFIG to "spring.kafka",
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java
        )
        val consumerFactory = DefaultKafkaConsumerFactory<String, String>(consumerConfigs)
    
        /**
         * ContainerProperties 는 컨슈머의 동작 방식과 관련있다.
         * - 어떤 토픽을 컨슘할 것인지
         * - 어떤 방식으로 컨슘할 것인지 (assign / subscribe)
         * - 어떤 리스너를 이용하여 컨슘할 것인지 (MessageListener)
         * ...
         */
        val containerProperties = ContainerProperties("test.topic.1")
        containerProperties.messageListener = MessageListener { data: ConsumerRecord<String, String> ->
            println(data) // 컨슘한 메시지를 콘솔에 출력한다.
        }
    
        return KafkaMessageListenerContainer(consumerFactory, containerProperties)
    }

    (2) ConcurrentMessageListenerContainer

    ConcurrentMessageListenerContainer(동시적 메시지 리스너 컨테이너)는 앞서 살펴본 카프카 메시지 리스너 컨테이너와 생성 방법이 매우 유사하다. 하지만 concurrency 속성을 추가로 설정할 수 있으며, 이 값에 따라 카프카 메시지 리스너 컨테이너가 개별적으로 생성되어 멀티 스레드 처리를 할 수 있도록 합니다.

    KafkaMessageListenerContainer 와 유사한 생성 방법. 다만 concurrency를 추가로 생성할 수 있다.

     

    마무리

     Spring Kafka에선 MessageListenerContainer를 구성하여 컨슈밍을 합니다. 이번 글을 통해 MessageListenerContainer는 무엇인지, 어떤 구현체가 있는지, 어떤 설정들이 필요한지 알아봤습니다. 

    참고

    반응형

    댓글

Designed by Tistory.