ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Connect] Confluent S3 Sink Connector는 왜 OOM에 취약한가?
    개발자 라이프/카프카 2021. 3. 21. 23:35
    반응형

    들어가며

     카프카 커넥터는 카프카와 데이터 소스 사이의 파이프라인을 몇 가지 설정이 포함된 HTTP 요청으로 만들어집니다(참고). 그 중 S3 Sink Connector(이하 S3 싱크 커넥터)는 카프카의 토픽에 적재된 메시지들을 AWS S3 버켓에 저장해주는 커넥터입니다. S3는 데이터 백업, 영구 저장 등으로 자주 사용되므로, S3 싱크 커넥터는 메시지를 영구 저장하지 않는 카프카 입장에선 쉽게 고려됩니다.

     하지만 S3 싱크 커넥터를 적용하는 과정에서 Out Of Memory Error가 종종 발생하는 것을 볼 수 있습니다. 이 글은 S3 싱크 커넥터가 메시지를 카프카로부터 가져와서 S3OutputStream을 통해 저장하기까지의 과정을 살펴보면서 S3 싱크 커넥터가 왜 OOM에 취약한 이유를 살펴봅니다. 만약 S3 싱크 커넥터를 고려하고 있다면, 이 글을 통해 부디 저처럼 삽질하지 마시길 바랍니다. ㅜ

    Background

    SinkTask의 Lifecycle

     SinkTask 클래스는 카프카로부터 메시지를 가져와 Sink 대상 시스템에 저장하는 과정을 정의한 추상 클래스입니다. 각 싱크 커넥터는 이 클래스를 바탕으로 싱크하는 작업을 구현하게 됩니다. SinkTask는 initialize(SinkTaskContext), put(Collection), flush(Map) 등의 API를 통해 아래와 같은 Lifecycle이 정의되고 실행됩니다. 

    1. 초기화(Initialization) : SinkTask 는 처음 `initialize(SinkTaskContext)` 를 이용해서 태스크의 컨텍스트를 준비하고, `start(Map)`으로 태스크 작업에 필요한 설정 값들을 적용합니다. 
    2. 파티션 할당(Partition Assignment) : 초기화 이후 커넥트는 `open(Collection)` 을 이용하여 태스크가 담당할 파티션들을 할당해줍니다. 이 파티션들은 `close(Collection)`이 호출되기 전까지 태스크에 소유됩니다. 
    3. 레코드 처리(Record Processing) : 파티션이 쓰기 작업을 위해 open 되면, 커넥트는 `put(Collection)`을 통해 카프카로부터 가져온 레코드(메시지)들을 태스크에 전달합니다. `put(Collection)`은 레코드들을 다운스트림 시스템에 저장하거나 혹은 그러한 작업을 위한 전처리를 합니다. 이후 배치된(batched) 레코드들을 다운스트림 시스템에 저장했음을 보장하기 위해 `flush(Map)`을 호출할 수 있습니다. 
    4. 파티션 리밸런싱(Partition Rebalancing) : 특정한 경우에 태스크 별로 할당된 파티션이 변경될 수 있습니다. 이 때, 커넥트는 `close(Collection)`으로 태스크에 할당된 모든 파티션들을 종료하고, 다시 신규 할당을 위해 `open(Collection)`을 호출합니다.
    5. 종료(Shutdown) : 태스크가 종료가 필요한 경우, 커넥트는 `stop()`를 통해 파티션들을 종료하고 태스크를 종료합니다.

    SinkTask의 Lifecycle. (출처 : https://www.slideshare.net/opencredo/kafka-summit-2018-a-journey-building-kafka-connectors-pegerto-fernandez)

    파티셔너(Partitioner)

     S3에 오브젝트를 저장할 때, 오브젝트를 어디에 저장할지 결정하게 됩니다. 이를 파티셔닝이라고 하는데, 그 파티셔닝을 처리하는 클래스가 파티셔너입니다. 파티셔너는 partitioner.class라는 설정을 통해 설정되는데 현재는 아래와 같은 파티셔너들을 바로 사용할 수 있고, 필요하다면 별도의 클래스를 직접 구현해서 사용할 수도 있습니다.

    • DefaultPartitioner : 토픽의 파티션 별로 파티셔닝
    • FieldPartitioner : 메시지의 필드 값을 이용해서 파티셔닝
    • TimeBasedPartitioner : 메시지의 timestamp 값을 이용하여 파티셔닝
    • DailyPartitioner, HourlyPartitioner : TimeBasedPartitioner를 확장한 파티셔너, 각각 날과 시간에 따라 파티셔닝

     이처럼 S3 싱크 커넥터는 지정된 파티셔너를 이용하여 카프카로부터 consume(컨슘)한 메시지를 어디에 저장할지 결정합니다.

    TimeBasedPartitioner 를 통해 S3 Sink Connector를 구성했을 때, S3 에 파티셔닝 되어 메시지가 저장되는 모습.

    OOM에 취약한 원인 분석

    S3 가 메시지를 임시 저장하는 방법

     현재, S3 싱크 커넥터는 모든 오브젝트를 메모리에서 처리하고 있습니다. 즉, 카프카로부터 메시지를 consume 하여 S3OutputStream을 통해 S3에 저장하기까지, 모든 메시지들을 커넥트의 메모리에 들고 있습니다. 따라서 Out Of Memory Error에 매우 취약합니다. 

     그리고 S3 싱크 커넥터에서 OOM 발생하는 이유는 대부분 파티셔닝과 연관되어 있습니다. S3 싱크 커넥터가 파티셔너를 이용해서 메시지들을 파티셔닝 하고, 그 파티셔닝 값에 따라 writer를 구성합니다. 또한 writer는 메시지를 버퍼링 하기 위해 buffer를 구성하게 됩니다. buffer에 대한 설정은 s3.part.size로, 기본 값은 26MB(26214400 byte)입니다. 입니다.

    태스크에 할당된 토픽 파티션 별, S3 파티션 별 버퍼가 구성된 모습(년/월/일/시). 기본 값일 경우, 26MB * 3 * 3 = 234MB의 메모리를 차지하게 된다.

    다양성에 취약하다.

     토픽 파티션에 적재된 메시지의 timestamp 값이 다양하다면 어떨까요? 만약 위 그림에서 커넥트 JVM Heap 메모리가 최대 1GB로 잡혀있다고 가정해봅시다. 이때, 어떠한 이유로 인해 토픽 파티션 별로 2021년 03월 21일 00시부터 15시의 타임스탬프 값을 가진 메시지들이 짧은 오프셋 간격으로 쌓였다면, 태스크가 이 메시지들을 한 번에 가져올 수 있습니다. 그렇게 되면 해당 태스크는  1248MB(26MB * 3 * 16)의 Heap 메모리가 필요하므로 OOM 이 발생하게 됩니다. 즉, 메모리가 넉넉하지 않은 이상, 유입되는 메시지의 다양성(Variety)에 따라 OOM이 발생할 수 있습니다.

    Heap dump 분석

    실제 OOM으로 인해 생성된 Heap dump 파일 분석 내용

     해당 커넥트 인스턴스는 총 6GB의 힙 메모리를 할당했으나 1번에서 보는 것과 같이 그 수치를 넘게 되어 OOM이 발생했습니다. 힙 메모리의 대부분은 토픽 파티션 별로 writer를 HashMap으로 저장하고 있는 topicPartitionWriter(2번)가 차지하고 있는 것을 확인할 수 있습니다. 그리고 3번, 4번에서 확인할 수 있는 것처럼, writer는 다시 S3 파티셔닝 된 문자열 별(`year=2021/month=03/day=16/hour=10`)로 버퍼(2623936 byte)를 구성하는 것을 볼 수 있습니다. 

    마무리

     S3 Sink Connector는 카프카 커넥트 중 가장 범용적으로 사용할 수 있는 커넥터입니다. 하지만 앞서 살펴본 것처럼 Confluent S3 Sink Connector는 메시지의 다양성에 취약합니다. 물론 buffer 사이즈를 줄여 취약점을 어느 정도 방어할 수 있지만, 작은 사이즈의 오브젝트가 다수로 저장되는 등 추가적인 부작용을 발생하게 됩니다. 그러므로 Confluent S3 Sink Connector를 적용함에 있어 토픽 파티션의 속성을 충분히 파악하는 것이 필요합니다.

    반응형

    댓글

Designed by Tistory.