-
[Kafka 개발] 큰 사이즈의 메시지를 발행-소비하는 방법에 관한 예제 (How to Pub-Sub the large size message like a image)개발자 라이프/카프카 2020. 2. 28. 19:57반응형
들어가며
카프카는 메시지 발행-소비에 있어 한번에 요청하는 용량을 정할 수 있습니다. 프로듀서의 경우 `max.request.size`로 단일 send 요청의 크기를 정하고, 컨슈머의 경우 `fetch.max.bytes`로 단일 fetch 요청의 최대 크기를 정할 수 있습니다. 만약 단일 메시지의 크기가 이 설정들보다 클 경우 메시지를 Pub-Sub할 수 없습니다. 또한, 설정 값을 증가시켜 발행-소비를 가능하게 하더라도, 메시지 크기가 어느 정도에 이르면 성능이 하락하게 됩니다. 이때 우리는 메시지를 쪼개서 보내는 방법을 생각할 수 있습니다. 이번 글은 단일 요청에 대한 설정보다 메시지 크기가 큰 경우, 메시지 내용을 쪼개서 보내는 방법에 관하여 작성된 예제 코드를 설명합니다. 예제 코드는 깃헙에서 확인할 수 있습니다.
예제 시나리오
예제 코드는 기본 설정을 기반으로 합니다. 그렇기 때문에 한번에 발행할 수 있는 최대 메시지 크기는 1048576byte(1MB)이고, 소비(fetch) 가능한 메시지의 크기에 관한 기본 값은 52428800byte(약 50메가) 입니다. 이번 예제에서는 `chucked-image` 토픽을 기준으로 발행 과정에서 1MB를 넘는 이미지 파일을 쪼개서 보내고, 이를 컨슈머에서 순서에 맞춰 합쳐(merge) 원본 이미지를 생성하는 프로듀서, 컨슈머에 대해 작성되었습니다.
예제 환경 구성
예제 코드 실행에 앞서 발행-소비를 위한 토픽이 필요합니다. 따라서, `chuncked-image`를 생성합니다. `partitions`과 `replication-factor`에 관련된 설정은 임의로 하셔도 상관 없습니다. 저는 로컬 도커 환경을 기반으로 했기 때문에 `localhost:9092` 으로 연결 가능한 카프카 클러스터 위에서 진행했습니다.
$ kafka-topics \ --bootstrap-server localhost:9092 \ --create \ --topic chucked-image \ --partitions 1 \ --replication-factor 1
토픽 생성 후, 프로듀서와 컨슈머가 카프카 클러스터를 통해 발행-소비할 수 있어야 하므로 각 코드의 `bootstrap-server` 관련 설정 부분을 자신의 환경에 맞게 변경합니다.
// ImageProducer#main ... // broker configure Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // <- 변경 필요!
// ImageConsumer#main ... // broker configure Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // <- 변경 필요!
예제 실행
카프카 클러스터에 관한 설정이 완료되면 각 코드를 실행합니다. 실행 순서는 컨슈머를 먼저 실행하고, 프로듀서를 실행합니다. 작성된 컨슈머는 무한 루프를 통해 지속적으로 메시지를 소비하지만, 프로듀서는 한번만 이미지 메시지를 보내고 종료되기 때문입니다. 정상적으로 메시지가 발행-소비되면 다음 로그와 함께, image-consumer/images 디렉토리에 다음 이미지가 생성되는 것을 알 수 있습니다.
# ImageProducer console == total image size : 1884281 == send result * partition : 0 * offset : 0 * timestamp : 1582638573354 * value size : 500041 ...
# ImageConsumer console == image [over_max_size.jpg] is wrote * size : 1884281
예제 특징
1. 멀티 프로듀서, 컨슈머 환경을 고려
잘라진 메시지는 컨슈머에서 합쳐져야 합니다. 컨슈머는 지정된 파티션에서만 메시지를 가져오기 때문에, 프로듀서가 같은 이미지에서 쪼개진 청크(chunk)를 서로 다른 파티션에 발행할 경우 컨슈머는 쪼개진 이미지를 결합할 수 없습니다. 따라서, 동일 이미지로 생성된 청크들의 메시지는 동일한 key 값을 지정하여 같은 파티션으로 전달될 수 있도록 했습니다. 다만, 멀티 프로듀서 환경에서 메시지에 부여하는 key 값이 유일성이 보장되지 않으면 언제든지 key의 중복이 일어날 수 있고, 이렇게 중복이 발생한 이미지들은 정상적으로 저장되지 않을 수 있습니다. 따라서 key 값을 유일성이 보장되는 UUID 형식으로 사용합니다.
// initialize producer & send records Producer<UUID, ImageChunk> producer = new KafkaProducer<>(props); UUID uuid = UUID.randomUUID(); for (ImageChunk imageChunk : imageChunks) { ProducerRecord<UUID, ImageChunk> record = new ProducerRecord<>(TOPIC_NAME, uuid, imageChunk); RecordMetadata recordMetadata = producer.send(record).get(); printResult(recordMetadata); }
2. Custom Serializer/Deserializer 적용
이미지 청크와 별도의 메타 데이터를 함께 ImageChunk 객체로 생성합니다. 이에 따라 해당 객체를 프로듀서와 컨슈머에서 발행-소비할 수 있도록 ImageChunkSerializer/ImageChunkDeserializer 를 생성하여 적용했습니다.
// 프로듀서 측 설정 Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.UUIDSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "me.daehokimm.ImageChunkSerializer"); // custom serializer`
// 컨슈머 측 설정 Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.UUIDDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "me.daehokimm.ImageChunkDeserializer"); // custom deserializer props.put(ConsumerConfig.GROUP_ID_CONFIG, "chucked-image");
마무리
이번 예제는 단일 메시지가 설정 값 혹은 성능에 따라 쪼개져야 할 경우, 이를 발행-소비 할 수 있도록하는 예제였습니다. 개선 사항은 깃헙 이슈를 통해 언제든지 이슈나 PR 부탁드립니다! (코드 리뷰도 항상 환영입니다!)
* 예제가 도움되셨다면 깃헙의 Star 버튼⭐️을 눌러주세요!
* 이 글이 도움되셨다면 공감 버튼❤️을 눌러주세요!반응형