ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka 101] 카프카 프로듀서 (Kafka Producer)
    개발자 라이프/카프카 2020. 3. 15. 15:45
    반응형

    들어가며

     카프카는 메시지를 생산, 발송하는 프로듀서(Producer)와 메시지를 소비, 수신하는 컨슈머(Consumer), 그리고 프로듀서와 컨슈머 사이에서 메시지를 중개하는 브로커(Broker)로 구성됩니다. 이번 글은 메시지 흐름의 시작인 프로듀서에 대해 전반적으로 설명합니다.

    카프카 프로듀서

     프로듀서는 보통 카프카 프로듀서 API와 그것으로 구성된 애플리케이션을 말합니다. 그리고 프로듀서는 브로커에 특정 토픽(혹은 파티션 영역까지)을 지정하여 메시지를 전달하는 역할을 담당합니다. 프로듀서를 통해 전달되는 메시지의 구조는 다음과 같습니다. 

    • 토픽 (Topic)
    • 토픽 중 특정 파티션 위치 (Partition)
    • 메시지 생성 시간 (Timestamp)
    • 메시지 키 (Key)
    • 메시지 값 (Value)

    프로듀서 구조와 메시지 전달 과정

     프로듀서는 다음 4가지 과정을 통해 메시지를 브로커로 전달합니다. 이 과정은 브로커에 메시지를 전송할 수 있도록 변환하거나, 필요한 값을 지정해주는 과정입니다.

    1. 직렬화 (Serializer)
    2. 파티셔닝 (Partitioner)
    3. 메시지 배치 (Record Accumulator)
    4. 압축 (Compression)
    5. 전달 (Sender)

    메시지가 브로커로 전달되는 과정 (출처 : https://www.linkedin.com/pulse/kafka-producer-overview-sylvester-daniel)

     프로듀서는 먼저, 전달 요청받은 메시지를 직렬화합니다. 직렬화(Serialization)는 Serializer가 지정된 설정을 통해 처리하며, 메시지의 키와 값은 바이트 뭉치 형태로 변환됩니다. 직렬화 과정을 마친 메시지는 Partitioner를 통해 토픽의 어떤 파티션에 저장될지 결정됩니다. 이 과정을 파티셔닝(Partitioning)이라 말합니다. Partitioner는 정의된 로직에 따라 파티셔닝을 진행하는데, 별도의 Partitioner 설정을 하지 않으면 Round Robbin 형태로 파티셔닝을 합니다. 즉, 파티션들에게 골고루 전달할 수 있도록 파티셔닝을 합니다. 다만, 이 과정은 메시지 전달 요청에 파티션이 지정되지 않았을 경우에만 진행됩니다. 따라서 메시지 전달 요청에 특정 파티션이 지정되었을 경우에는 별도의 파티셔닝 없이 해당 파티션으로 전달되도록 지정됩니다.

     만약 메시지 압축이 설정되었다면, 설정된 포맷에 맞춰 메시지를 압축합니다. 압축된 메시지는 브로커로 빠르게 전달할 수 있을뿐더러, 브로커 내부에서 빠른 복제가 가능하도록 합니다. 그렇게 때문에 메시지 압축 설정은 많은 경우에 고려될 수 있습니다. 카프카에서 지원하는 주요 압축 포맷과 효율은 아래와 같습니다.

     파티셔닝과 압축을 마친 후, 프로듀서는 메시지를 TCP 프로토콜을 통해 브로커 리더 파티션으로 전송합니다. 하지만 메시지마다 매번 네트워크를 통해 전달하는 것은 비효율적입니다. 네트워크 전송은 매우 무거운 처리이기 때문입니다. 그래서 프로듀서는 지정된 만큼 메시지를 저장했다가 한 번에 브로커로 전달합니다. 이 과정은 프로듀서 내부의 Record Accumulator(RA)가 담당하여 처리합니다. RA는 각 토픽 파티션에 대응하는 배치 큐(Batch Queue)를 구성하고 메시지들을 레코드 배치(Record Batch) 형태로 묶어 큐에 저장합니다.

     각 배치 큐에 저장된 레코드 배치들은 때가 되면 각각 브로커에 전달됩니다. 이 과정은 Sender가 처리합니다. Sender는 스레드 형태로 구성되며, 관리자가 설정한 특정 조건에 만족한 레코드 배치를 브로커로 전송합니다. 이때, Sender 스레드는 네트워크 비용을 줄이기 위해 piggyback 방식으로 조건을 만족하지 않은 다른 레코드 배치를 조건을 만족한 것과 함께 브로커로 전송합니다. 

    같은 브로커로 보내야하는 토픽 파티션이 있으면 함께 전송합니다.

     Piggyback이란 '등 뒤에 업다'라는 뜻입니다. 위 그림을 예로 들면, 토픽 B의 파티션 1(B_1)의 큐에 레코드 배치가 전송할 조건을 만족했다고 가정하면, Sender는 해당 레코드 배치를 가져와 3번 브로커로 전송할 준비를 합니다. 이때, 토픽 A의 파티션 2(A_2)가 전송 조건을 만족하지 않았더라도 같은 3번 브로커에 전송돼야 하므로, Sender는 A_2 레코드 배치를 업어 한번에 3번 브로커로 전송합니다. 이로 인해 자연스럽게 네트워크 비용을 줄일 수 있습니다.

    Sender 스레드의 메시지 전송 요청과 응답 (출처 : https://www.linkedin.com/pulse/kafka-producer-overview-sylvester-daniel)

     브로커에 네트워크 전송 요청을 보낸 Sender는 설정 값에 따라 브로커의 응답을 기다리거나 혹은 기다리지 않습니다. 만약 응답을 기다리지 않는 설정인 경우, 메시지 전송에 대한 과정이 마쳐집니다. 하지만 응답을 기다리는 경우, 메시지 전송 성공 여부를 응답으로 받습니다. 이때, 브로커에서 메시지 전송이 실패한 경우에는 설정 값에 따라 재시도를 시도합니다. 재시도 횟수를 초과한 경우에는 예외를 뱉어냅니다. 반대로 성공한 경우에는 메시지가 저장된 정보(메타데이터)를 반환합니다. 메타데이터는 메시지가 저장된 토픽, 파티션, 오프셋, 타임스탬프 정보를 가지고 있습니다.

    메시지 전송 사이클 (출처 : https://dzone.com/articles/take-a-deep-dive-into-kafka-producer-api)

    프로듀서 예제 코드

     프로듀서 예제 코드는 다음과 같습니다. 해당 예제는 로컬 환경에 설치된 카프카 클러스터에 요청을 보내는 것이며, 만약 해당 코드를 테스트하시려면 example-topic이라는 이름의 토픽을 생성하고 하시기 바랍니다. 

    package me.daehokimm;
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.HashMap;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class ProducerApp
    {
        public static void main( String[] args )
        {
            // 설정 부분. 필수적인 설정만 했습니다.
            HashMap<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    
            // 프로듀서 객체 생성 및 메시지 전송 요청
            Producer<String, String> producer = new KafkaProducer<>(props);
            ProducerRecord<String, String> record = new ProducerRecord<>("example-topic", null, "Hello World"); // (topic, key, value) 값이 들어갑니다.
            Future<RecordMetadata> future = producer.send(record); // 전송 요청
    
            // 동기 형태로 전송 응답을 기다립니다.
            try {
                RecordMetadata metadata = future.get();	
                describeMetadata(metadata);	// 응답 결과를 콘솔에 출력합니다.
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    
        private static void describeMetadata(RecordMetadata metadata) {
            System.out.println("=== Metadata...");
            System.out.println("topic     : " + metadata.topic());
            System.out.println("partition : " + metadata.partition());
            System.out.println("offset    : " + metadata.offset());
            System.out.println("timestamp : " + metadata.timestamp());
        }
    }

     예제 코드는 크게 3가지 부분으로 구분할 수 있습니다. 

    • 설정 부분
    • 프로듀서 객체 생성 및 메시지 전송 부분
    • 응답 대기 및 출력 부분

     먼저 설정 부분은 프로듀서 설정 중 가장 필수적인 부분 3가지만 설정했습니다. 해당 설정들은 반드시 들어가야 하며, 설정하지 않을 경우 런타임 에러가 발생합니다. 예제에서는 메시지의 키와 값이 모두 String 타입을 사용할 것이므로, 키와 값에 대한 Serializer를 StringSerializer로 설정했습니다. 설정 값으로 구성된 Map 객체(props)는 프로듀서 객체 생성 부분에서 인자로 사용됩니다.

    HashMap<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

    프로듀서 또한 메시지 key/value 타입에 맞게 타입을 지정하여 생성합니다. 생성된 프로듀서를 통해 전송할 메시지 또한 타입에 맞게 생성합니다. 레코드 생성에 사용된 인자는 각각 topic, key, value 로써, 예제에서는 example-topic에 메시지의 key 값 없이(null) 생성했으며 파티션 또한 지정하지 않았습니다. 메시지의 내용은 "Hello World"입니다. 

    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    ProducerRecord<String, String> record = new ProducerRecord<>("example-topic", null, "Hello World");
    Future<RecordMetadata> future = producer.send(record);

     메시지 전송 비동기로 이뤄집니다. 이에 따라 Future 타입의 객체를 반환합니다. 하지만 해당 객체에서 .get() 메서드를 호출하여 응답 값을 동기적으로 반환받을 수 있습니다.

    try {
            RecordMetadata metadata = future.get();
            describeMetadata(metadata);
    } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
    }

    아래는 예제 코드를 실행했을 때 콘솔에 출력되는 내용입니다. 

    === Metadata...
    topic     : example-topic
    partition : 0
    offset    : 1
    timestamp : 1584249668005

     

    프로듀서의 주요 설정

     앞서 메시지 전송 과정에서 여러 설정 값에 대한 언급이 있었습니다. 프로듀서는 예제 코드에서 사용한 필수적인 설정 값 3개 외에도 다양한 설정 값이 존재하며, 이러한 설정들은 메시지 전달 로직, 전송 성능, 전송 실패 처리 등 여러 상황에 맞게 조절될 수 있습니다.

    • bootstrap.servers : 카프카 클러스터와 연결 시 사용되는 설정. 브로커의 호스트 정보를 넣습니다.
    • key.serializer : 메시지의 키를 직렬화 할 때 사용할 클래스 설정. 직렬화 클래스의 경로를 넣습니다.
    • value.serializer : 메시지의 값을 직렬화 할 때 사용할 클래스 설정. 마찬가지로 직렬화 클래스의 경로를 넣습니다.
    • partitioner.class : 파티셔너로 사용할 클래스 설정. 해당 클래스 경로를 넣으며, 기본 값은 카프카에서 제공하는 DefaultPartitioner 클래스에 대한 경로입니다.
    • compression.type : 압축 타입에 관한 설정.
    • buffer.memory : Record Accumulator에서 사용할 메모리의 총 양에 관한 설정. 해당 설정을 넘기면 프로듀서가 정상 동작하지 않습니다.
    • batch.size : 레코드 배치의 크기에 관한 설정(bytes). 레코드 배치가 해당 설정 값에 다다르면 메시지를 전송합니다. 
    • linger.ms : 레코드 배치의 최대 전송 대기 시간 설정(ms). batch.size에 만족하지 않더라도 레코드 배치는 이 설정에 따라 전송됩니다.
    • acks : 프로듀서가 전송 후 브로커의 응답을 기다리는 설정. '0' 일 경우 응답을 기다리지 않고, '1' 일 경우 리더 파티션의 응답만 기다립니다. 'all' 혹은 '-1' 일 경우 리더 파티션의 복제(replication)까지 기다립니다. 
    • retires : 재전송 가능 횟수에 관한 설정. 
    • max.in.flight.requests.per.connection : 프로듀서가 응답 여부에 상관없이 한 번에 보낼 수 있는 요청 횟수에 관한 설정. 2 이상이면 한 번의 연결에서 2개 이상의 메시지 전송 요청을 보낼 수 있다. 

    보다 자세한 내용은 공식 문서를 참고하시길 바랍니다. :) 

    프로듀서 설정 유의점

     카프카 프로듀서는 기본적으로 메시지를 한 번에 많이 보내려고 합니다. 하지만 이러한 설정은 요구되는 메시지 전송 환경에 따라 적합하지 않을 수 있습니다. 따라서 처리량과 지연율 사이에서 혹은 지연율과 전송 안정성 사이에서 트레이드오프(Trade off)를 고려해야 합니다. 아래는 주요한 설정 유의점에 관한 예시입니다. 

    • batch.size, linger.ms 설정 값이 높을수록 처리량은 증가하지만, 그만큼 저장되었다가 전송되므로 지연율이 높아집니다(안 좋아집니다).
    • acks 설정이 0 -> 1 -> all 로 설정될수록 메시지 전송 안정성은 증가하지만, 그만큼 응답을 대기해야 하므로 지연율이 높아집니다. 
    • acks 설정이 all 이더라도 브로커의 min.insync.replicas 설정이 1인 경우에는 acks 설정이 1인 경우와 동일한 메시지 안정성을 제공합니다. [참고]
    • retries 설정이 1 이상인 경우 메시지 전송 안정성이 증가합니다. 하지만 이때, max.in.flight.request.per.connection 설정이 2 이상일 경우, 부분 실패에 따른 재전송 과정에서 메시지 순서가 바뀔 수 있습니다.

    마무리

    이번 글을 통해 카프카 프로듀서의 구조, 메시지 전송 과정, 주요 설정에 관하여 살펴봤습니다. 혹여나 잘못된 부분이나 궁금하신 것이 있다면 언제든지 댓글로 남겨주시길 바랍니다. 

    ps. 이번 글이 마음에 들었다면 공감 버튼❤️을 눌러주세요 :) 

    반응형

    댓글

Designed by Tistory.