ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka 개발] 큰 사이즈의 메시지를 발행-소비하는 방법에 관한 예제 (How to Pub-Sub the large size message like a image)
    개발자 라이프/카프카 2020. 2. 28. 19:57
    반응형

    들어가며

     카프카는 메시지 발행-소비에 있어 한번에 요청하는 용량을 정할 수 있습니다. 프로듀서의 경우 `max.request.size`로 단일 send 요청의 크기를 정하고, 컨슈머의 경우 `fetch.max.bytes`로 단일 fetch 요청의 최대 크기를 정할 수 있습니다. 만약 단일 메시지의 크기가 이 설정들보다 클 경우 메시지를 Pub-Sub할 수 없습니다. 또한, 설정 값을 증가시켜 발행-소비를 가능하게 하더라도, 메시지 크기가 어느 정도에 이르면 성능이 하락하게 됩니다. 이때 우리는 메시지를 쪼개서 보내는 방법을 생각할 수 있습니다. 이번 글은 단일 요청에 대한 설정보다 메시지 크기가 큰 경우, 메시지 내용을 쪼개서 보내는 방법에 관하여 작성된 예제 코드를 설명합니다. 예제 코드는 깃헙에서 확인할 수 있습니다.

    예제 시나리오

     예제 코드는 기본 설정을 기반으로 합니다. 그렇기 때문에 한번에 발행할 수 있는 최대 메시지 크기는 1048576byte(1MB)이고, 소비(fetch) 가능한 메시지의 크기에 관한 기본 값은 52428800byte(약 50메가) 입니다. 이번 예제에서는 `chucked-image` 토픽을 기준으로 발행 과정에서 1MB를 넘는 이미지 파일을 쪼개서 보내고, 이를 컨슈머에서 순서에 맞춰 합쳐(merge) 원본 이미지를 생성하는 프로듀서, 컨슈머에 대해 작성되었습니다.

    예제 환경 구성

     예제 코드 실행에 앞서 발행-소비를 위한 토픽이 필요합니다. 따라서, `chuncked-image`를 생성합니다. `partitions`과 `replication-factor`에 관련된 설정은 임의로 하셔도 상관 없습니다. 저는 로컬 도커 환경을 기반으로 했기 때문에 `localhost:9092` 으로 연결 가능한 카프카 클러스터 위에서 진행했습니다. 

    $ kafka-topics \
    --bootstrap-server localhost:9092 \
    --create \
    --topic chucked-image \
    --partitions 1 \
    --replication-factor 1

     토픽 생성 후, 프로듀서와 컨슈머가 카프카 클러스터를 통해 발행-소비할 수 있어야 하므로 각 코드의 `bootstrap-server` 관련 설정 부분을 자신의 환경에 맞게 변경합니다. 

    // ImageProducer#main
    ...
        // broker configure
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // <- 변경 필요! 
    // ImageConsumer#main
    ...
        // broker configure
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // <- 변경 필요! 

    예제 실행

     카프카 클러스터에 관한 설정이 완료되면 각 코드를 실행합니다. 실행 순서는 컨슈머를 먼저 실행하고, 프로듀서를 실행합니다. 작성된 컨슈머는 무한 루프를 통해 지속적으로 메시지를 소비하지만, 프로듀서는 한번만 이미지 메시지를 보내고 종료되기 때문입니다. 정상적으로 메시지가 발행-소비되면 다음 로그와 함께, image-consumer/images 디렉토리에 다음 이미지가 생성되는 것을 알 수 있습니다. 

    # ImageProducer console
    == total image size : 1884281
    == send result
    * partition  : 0
    * offset     : 0
    * timestamp  : 1582638573354
    * value size : 500041
    ...
    # ImageConsumer console
    == image [over_max_size.jpg] is wrote
    * size : 1884281

    발행-소비되는 이미지 (저와 함께 사는 고양이 울이 입니다 :))

    예제 특징

    1. 멀티 프로듀서, 컨슈머 환경을 고려

     잘라진 메시지는 컨슈머에서 합쳐져야 합니다. 컨슈머는 지정된 파티션에서만 메시지를 가져오기 때문에, 프로듀서가 같은 이미지에서 쪼개진 청크(chunk)를 서로 다른 파티션에 발행할 경우 컨슈머는 쪼개진 이미지를 결합할 수 없습니다. 따라서, 동일 이미지로 생성된 청크들의 메시지는 동일한 key 값을 지정하여 같은 파티션으로 전달될 수 있도록 했습니다.  다만, 멀티 프로듀서 환경에서 메시지에 부여하는 key 값이 유일성이 보장되지 않으면 언제든지 key의 중복이 일어날 수 있고, 이렇게 중복이 발생한 이미지들은 정상적으로 저장되지 않을 수 있습니다. 따라서 key 값을 유일성이 보장되는 UUID 형식으로 사용합니다.

    // initialize producer & send records
    Producer<UUID, ImageChunk> producer = new KafkaProducer<>(props);
    UUID uuid = UUID.randomUUID();
    for (ImageChunk imageChunk : imageChunks) {
        ProducerRecord<UUID, ImageChunk> record = new ProducerRecord<>(TOPIC_NAME, uuid, imageChunk);
        RecordMetadata recordMetadata = producer.send(record).get();
        printResult(recordMetadata);
    }

    2. Custom Serializer/Deserializer 적용

     이미지 청크와 별도의 메타 데이터를 함께 ImageChunk 객체로 생성합니다. 이에 따라 해당 객체를 프로듀서와 컨슈머에서 발행-소비할 수 있도록 ImageChunkSerializer/ImageChunkDeserializer 를 생성하여 적용했습니다.

      // 프로듀서 측 설정
      Map<String, Object> props = new HashMap<>();
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.UUIDSerializer");
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "me.daehokimm.ImageChunkSerializer");		// custom serializer`
      // 컨슈머 측 설정
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.UUIDDeserializer");
      props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "me.daehokimm.ImageChunkDeserializer");		// custom deserializer
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "chucked-image");

    마무리

     이번 예제는 단일 메시지가 설정 값 혹은 성능에 따라 쪼개져야 할 경우, 이를 발행-소비 할 수 있도록하는 예제였습니다. 개선 사항은 깃헙 이슈를 통해 언제든지 이슈나 PR 부탁드립니다! (코드 리뷰도 항상 환영입니다!)

    * 예제가 도움되셨다면 깃헙의 Star 버튼⭐️을 눌러주세요!
    * 이 글이 도움되셨다면 공감 버튼❤️을 눌러주세요!

    반응형

    댓글

Designed by Tistory.