ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka Connect] Confluent S3 Sink Connector 의 설정과 Heap memory
    개발자 라이프/카프카 2021. 4. 5. 18:24
    반응형

    들어가며

     앞선 Confluent S3 Sink Connector는 왜 OOM에 취약한가?를 통해 Confluent S3 Sink Connector(이하 s3 커넥터)가 OOM에 취약한 이유에 대해 알아봤습니다. 그렇다면 s3 커넥터를 배포 함에 있어 OOM이 안나기 위해 어떤 설정들을 고려해야 하는지, 또 그 설정들에 따라 어떤 케이스가 발생할 수 있는지 이번 글을 통해 알아봅니다.

    S3 Sink Connector 가 메모리를 점유하는 흐름

     S3 커넥터가 어떤 설정을 통해 Heap memory가 결정되는지 알아보기 전에, 먼저 S3 커넥터가 주로 Heap memory를 점유하는 흐름을 알아봅니다.

    하나의 Task 가 Topic에서 레코드를 컨슘한 뒤 S3 적재하는 과정. 여러 버퍼가 존재한다.

     위 그림은 하나의 S3SinkTask가 특정 Topic에서 레코드를 컨슘 한 뒤 S3로 적재하는 과정을 나타냅니다. 그림을 통해서 다음과 같이 파악할 수 있습니다.

    • 하나의 Task에 대해서 여러 개의 Topic Partition을 컨슘 한다.
    • 컨슘 한 레코드는 TopicPartition 별로 구성된 TopicPartitionWriter의 버퍼(Queue <Record>)에 적재된다. (TopicPartitionWriter의 buffer)
    • 버퍼에 적재된 레코드는 Partitioner.class 설정에 따라 파티셔닝 된다.
    • 파티셔닝 결과로 반환된 encodePartition 별로 s3.part.size 설정 값 크기를 가지는 버퍼(byte [])가 구성된다. (RecordWriter의 buffer)
    • 그리고 특정 조건이 되면 buffer에서 s3로 flush 합니다.

     여기서 s3.part.size의 기본 값은 26214400(25Mb), 최소 값은 5242880(5Mb)입니다. 따라서 S3 커넥터가 주로 Heap Memory를 사용하는 위치는 encodePartition에 대응하는 RecordWriter의 버퍼입니다. 그러므로 OOM이 발생되지 않기 위해선 (1) encodePartition의 수를 줄이거나 (2)RecordWriter의 버퍼 크기를 줄여야 합니다.

    TopicPartitionWriter의 필드 중 encodePartition에 대응하는 RecordWriter를 저장하는 객체 writers

    S3 Sink Connector에서 Heap memory와 유관한 설정들

     앞서 살펴본 흐름과 관련하여 S3 커넥터에선 아래와 같은 설정을 제공합니다.

    • partitioner.class : S3 파티셔닝을 위한 클래스. 대표적으로 FieldPartitioner와 TimeBasedPartitioner가 있다.
    • s3.part.size : S3 오브젝트의 최대 크기. encodePartition에 따라 구성되는 버퍼의 크기를 결정한다.
    • flush.size : S3 오브젝트가 가지는 최대 메시지 개수. TopicPartitionWriter에서 flush 하는 조건이기도 하다.
    • rotate.(schedule.)interval.ms : S3SinkTask가 flush 하는 최대 시간 간격. schedule 설정은 정적인 시간 값을 이용한다.

    그렇다면 각 설정 별로 어떤 방식으로 Heap memory에 영향을 미칠까요. 그림으로 살펴봅니다.

    partitioner.class

     partitioner.class는 그 로직에 따라 encodePartition의 수를 결정하게 됩니다. partitioner(파티셔너)는 대표적으로 FieldPartitioner와 TimeBasedPartitioner 가 있습니다. 그렇다면 각각의 파티셔너는 어떤 상황에 encodePartition의 수가 증가하게 될까요.

    • FieldPartitioner의 경우
      • 레코드들의 필드 값이 다양해서 encodePartition 개수가 증가한다.
      • (현재는 아니지만) 여러 개의 필드가 설정될 수 있는 경우, encodePartition 개수가 증가할 수 있다.
    • TimeBasedPartitioner의 경우
      • timestamp.extractor로 추출한 값이 지속적으로 증가하는 시간 값일 때, partition format으로 지정한 시간에 대한 경계 값인 경우 encodePartition 개수가 순간 2배가 될 수 있다.
      • timestamp.extractor로 추출한 값이 일정하지 않은 시간 값일 때(대체로 RecordField인 경우), 시간 값의 분포에 따라 encodePartition 수가 증가할 수 있다.

     위 내용처럼 partitioner.class에 따라 그리고 그 파티셔너의 설정과 메시지 유입 환경에 따라 encodePartition의 수가 다양하게 증감될 수 있습니다.

    s3.part.size

     s3.part.size는 RecordWriter의 버퍼 크기를 결정합니다. 따라서 이 값이 커질수록 더 큰 버퍼 크기를 점유하게 되고, 따라서 encodePartition 수가 급증하게 되면 OOM이 더 쉽게 발생할 수 있습니다. partitioner.class와 메시지 유입 환경에 따라 평균적으로 s3에 저장된 오브젝트 크기를 가늠하여 최적화해야 불필요한 메모리 낭비를 줄일 수 있습니다. (메모리 낭비를 더 줄일 수 있도록 이슈에 따봉 부탁드립니다.)

    flush.size

     flush.size는 TopicPartitionWriter에서 flush를 하는 하나의 조건으로써, TopicPartitionWriter가 flush.size 수만큼의 레코드를 처리했다면 flush 하게 됩니다.

    TopicPartitionWriter가 flush.size만큼 레코드를 처리하면 버퍼에 쌓인 모든 레코드들을 s3에 적재(flush)한다.

    따라서 flush.size가 클수록 TopicPartitionWriter는 더 많은 레코드를 처리하게 되고, 그에 따라 더 많은 encodePartition이 생겨 Heap memory를 더 많이 점유할 수 있습니다.

    rotate.(schedule.)interval.ms

     rotate.(schedule.)interval.ms는 flush 하는 시간 조건입니다. 즉, TopicPartitionWriter가 특정 시간 단위로 메시지를 flush 하게 하는 설정입니다.

    TopicPartitionWriter에서 rotate.(schedule.)interval.ms 값에 따라 주기적으로 버퍼에 쌓인 모든 레코드들을 s3에 적재(flush)한다.

    따라서 rotate.(schedule.)interval.ms 설정 값이 클수록 TopicPartitionWriter는 더 오랜 시간 레코드를 처리하게 되고, 그에 따라 더 많은 encodePartition이 생겨 Heap memory를 더 많이 점유할 수 있습니다.

    그 외의 환경 조건

     위 설정들 외에도 Task가 할당받는 Topic Partition의 수에 따라 encodePartition의 수가 증가할 수 있습니다. TopicPartitionWriter는 Task에 할당된 TopicPartition에 따라 구성되므로, Topic Partition 수 대비 적은 Task 수를 구성하면 Task 내부에 많은 버퍼가 구성되어 Heap memory를 더 많이 필요로 하게 됩니다. 

    설정에 따른 Heap memory 산정식 : worst case

     위 설정 값에 따라 대략적으로 task 별 요구되는 heap memory를 worst case로 아래와 같이 산정할 수 있습니다.

    (Topic Partition 수 / Task 수) * flush.size * s3.part.size ~= Heap memory per Task

    위 산정식은 두 조건을 바탕으로 합니다. 첫째로 rotate.interval.ms에 대한 flush를 고려하지 않습니다. 그리고 둘째로 put()를 통해 컨슘 한 레코드 뭉치에 모든 레코드가 서로 다른 encodePartition으로 파티셔닝 되었을 경우입니다. 

    환경 별 Heap memory 최적화 방법과 부작용

    1. 다양하게 파티셔닝 될 수 있는 메시지가 들어오는 경우

    다양하게 파티셔닝 될 수 있는 메시지가 들어오는 경우, 버퍼 사이즈 조절과 함께 더 잦은 flush를 발생시킴으로써 flush 간격 내에 encodePartition 수(=버퍼 수)가 일정하게 유지될 수 있도록 합니다.

    • flush.size 축소
    • rotate.(schedule.)interval.ms 축소
    • s3.part.size 축소

    다만, 이렇게 설정했을 경우 s3에 작은 크기를 가지는 많은 수의 오브젝트가 생길 수 있습니다. 이는 s3 api 호출 빈도를 늘리기 때문에 요금적인 부분도 이슈가 될 수 있습니다.

    마무리

     이번 글을 통해 Confluent S3 Sink Connector의 설정 중 Heap memory에 영향을 주는 설정들을 살펴보고, 또 각 설정들이 어떻게 영향을 주는지 살펴봤습니다. 마지막으로 최적화 방법과 그에 따른 부작용도 간단히 살펴봤습니다. 환경 별 최적화 부분은 추후 발견되는 데로 추가하도록 하겠습니다. 

     혹시 잘못된 부분이나, S3 Sink Connector를 운영하면서 얻으신 팁이 있다면 댓글로 공유 부탁드립니다. 감사합니다.

     

    반응형

    댓글

Designed by Tistory.