ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka 운영] 컨슈머 그룹 정보는 언제 사라질까?
    개발자 라이프/카프카 2021. 3. 29. 01:21
    반응형

    들어가며

    카프카를 운영하다 보면 다양한 컨슈머가 붙었다(active) 떨어지곤(inactive) 합니다. 특히, 운영 과정에서 자주 사용하는 콘솔 컨슈머의 경우, 사용할 때마다 새로운 컨슈머 그룹 정보를 생성하게 됩니다. 그런데 운영하다 보면 어느샌가 이전에 사용했던 컨슈머 그룹 정보가 남아있지 않다는 것을 볼 수 있습니다. 그렇다면 카프카는 컨슈머 그룹 정보를 언제 청소(clean up)하게 될까요? 이번 글은 로그 정보를 이용하여 컨슈머 그룹 정보가 언제 어떤 설정과 연관되어 사라지는지 알아봅니다.

    __consumer_offsets를 찍어보자

    각 컨슈머 그룹은 컨슈밍 하는 토픽 파티션에 대하여 어디까지 컨슈밍 했는지를 __consumer_offsets 토픽에 저장합니다. 그렇다면 컨슈머 그룹은 이 토픽에 어떤 형태로 메시지를 저장할까요? 아래는 콘솔 컨슈머를 통해 __consumer_offsets 토픽을 컨슈밍 한 결과입니다.

    $ ./kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic __consumer_offsets \
    --from-beginning \
    --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
    
    ...
    [console-consumer-96201,test.consumer.offsets,0]::[OffsetMetadata[0,NO_METADATA],CommitTime 1616918718895,ExpirationTime 1617005118895]
    ...

    위에서 보다시피 __consumer_offsets은 OffsetsMessageFormatter라는 특별한 포맷터로 컨슈밍 합니다. 해당 코드를 보면 출력되는 정보는 아래와 같습니다.

    kafka 1.1 버전 기준 OffsetsMessageFormatter class 코드 일부

    • [컨슈머 그룹,토픽,파티션]
    • ::
    • [메시지 내용]

    여기서 우리는 __consumer_offsets 메시지 내용에 ExpirationTime에 관한 정보가 포함되어 있음을 확인할 수 있습니다. 해당 수치는 메시지가 인입된 CommitTime과 정확히 하루(86,400,000 ms) 차이 나는 것을 알 수 있습니다. 그렇다면 이 값을 지정해주는 어떤 설정이 있을까요?

    컨슈머 그룹의 Expire와 연관된 설정

    카프카 공식 문서를 통해 offset과 관련된 설정을 찾다 보면 다음 두 설정을 확인할 수 있습니다.

    • offsets.retention.check.interval.ms
    • offsets.retention.minutes

    이름을 통해 두 설정 모두 offset 보존(retention)과 관련된 설정임을 알 수 있습니다. 그중 offsets.retention.minutes을 살펴봅니다.

    2.0 버전부터 7일로 늘어났다. 이전에는 1일이 기본 값 (KIP-186: Increase offsets retention default to 7 days)

    설명에서 확인할 수 있듯이 offsets.retention.minutes 설정은 컨슈머 그룹이 모든 컨슈머를 잃었을 때 즉, 컨슈머 그룹이 Inactive 된 경우, 해당 컨슈머 그룹의 오프셋 정보를 유지하는 기간과 연관된 설정입니다. 그리고 실제로 이 값에 따라 __consumer_offsets 메시지의 ExpirationTime 값이 결정되는 것을 확인했습니다. 그렇다면 해당 설정을 아주 짧게 잡아 놓고 실제로 컨슈머 그룹이 없어지는지 확인해보겠습니다.

    직접 확인해보자

    카프카 1.1 버전을 기준으로 local 환경에서 아래 설정을 server.properties 파일에 포함여 standalone 카프카를 구동했습니다.

    • offsets.retention.minutes=1
    • offsets.retention.check.interval.ms=30000

    그리고 콘솔 컨슈머를 구동(console-consumer-17380)하여 임의의 토픽을 컨슈밍 하도록 했습니다. 그리고 시간 변화에 따라 카프카 로그를 확인해 봤을 때, 아래와 같은 결과를 확인할 수 있었습니다.

    # 콘솔 컨슈머로 컨슘 시작
    [2021-03-28 17:40:11,677] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-17380 with old generation 0 (__consumer_offsets-30) (kafka.coordinator.group.GroupCoordinator)
    [2021-03-28 17:40:11,683] INFO [GroupCoordinator 0]: Stabilized group console-consumer-17380 generation 1 (__consumer_offsets-30) (kafka.coordinator.group.GroupCoordinator)
    [2021-03-28 17:40:11,689] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-17380 for generation 1 (kafka.coordinator.group.GroupCoordinator)
    [2021-03-28 17:40:26,665] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 3 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    [2021-03-28 17:40:56,663] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    [2021-03-28 17:41:26,660] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    [2021-03-28 17:41:56,661] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    [2021-03-28 17:42:26,662] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    # 콘솔 컨슈머 컨슘 중지
    [2021-03-28 17:42:38,190] INFO [GroupCoordinator 0]: Preparing to rebalance group console-consumer-17380 with old generation 1 (__consumer_offsets-30) (kafka.coordinator.group.GroupCoordinator)
    [2021-03-28 17:42:38,192] INFO [GroupCoordinator 0]: Group console-consumer-17380 with generation 2 is now empty (__consumer_offsets-30) (kafka.coordinator.group.GroupCoordinator)
    [2021-03-28 17:42:56,662] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    [2021-03-28 17:43:26,663] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
    # offsets.retention.minutes=1 에 따라 empty 된 컨슈머 그룹의 expired 된 offset 삭제
    [2021-03-28 17:43:56,663] INFO [GroupMetadataManager brokerId=0] Group console-consumer-17380 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
    [2021-03-28 17:43:56,668] INFO [GroupMetadataManager brokerId=0] Removed 1 expired offsets in 6 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

    위 로그에서 다음과 같은 것을 확인할 수 있었습니다.

    • offsets.retention.check.interval.ms(=30초) 설정에 따라 expired 된 offsets을 찾아 삭제한다.
    • 컨슈머 그룹에 컨슈머가 존재하면(active) offsets은 expired 되지 않는다. (Removed 0 expired offsets)
    • 컨슈머 그룹에 컨슈머가 존재하지 않으면 ... is now empty 로그가 발생한다.
    • expired 된 후 offsets.retention.minutes(=1분) 설정에 따라 offsets이 삭제된다. (Removed 1 expired offsets)

    실제로 컨슈머 그룹 목록을 조회해봤을 때도 삭제 로그와 함께 컨슈머 그룹(console-consumer-17380)이 삭제된 것을 확인할 수 있습니다.

    삭제 로그 발생(43분 56초) 이후 컨슈머 그룹이 삭제 된 것을 확인할 수 있다.

    그렇다면 해당 컨슈머 그룹 관련되어 __consumer_offsets 내에선 어떤 메시지가 적재되었을까요? 아래와 같이 확인할 수 있었습니다.

    $ kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic __consumer_offsets \
    --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"  \
    --from-beginning | grep 17380
    [console-consumer-17380,test.consumer.offsets,0]::[OffsetMetadata[0,NO_METADATA],CommitTime 1616920816717,ExpirationTime 1616920876717]
    [console-consumer-17380,test.consumer.offsets,0]::[OffsetMetadata[0,NO_METADATA],CommitTime 1616920821716,ExpirationTime 1616920881716]
    ...
    [console-consumer-17380,test.consumer.offsets,0]::[OffsetMetadata[0,NO_METADATA],CommitTime 1616920951807,ExpirationTime 1616921011807]
    [console-consumer-17380,test.consumer.offsets,0]::[OffsetMetadata[0,NO_METADATA],CommitTime 1616920956811,ExpirationTime 1616921016811]
    [console-consumer-17380,test.consumer.offsets,0]::[OffsetMetadata[0,NO_METADATA],CommitTime 1616920958187,ExpirationTime 1616921018187]
    # expired 된 offst은 null 메시지가 찍힌다 
    [console-consumer-17380,test.consumer.offsets,0]::NULL 

    직접 확인해본 결과, expired 된 offsets이 삭제된 이후 해당 consumer에 관한 tombstone 메시지가 __consumer_offsets 토픽에 적재되는 것을 확인할 수 있었습니다. 따라서, 아래와 같이 정리할 수 있습니다.

    • offsets.retention.minutes 설정에 따라 컨슈머 그룹에 대한 ExpirationTime 이 지정되어 __consumer_offsets 메시지에 저장된다.
    • offsets_retention.check.interval.ms 설정에 따라 inactive 된 컨슈머 그룹 되고, ExpirationTime이 지난 offsets을 찾는다.
    • 만약, 위의 조건이 만족한 컨슈머 그룹이 있다면 tombstone 메시지를 발행한다.
    • tombstone 메시지가 발행된 컨슈머 그룹은 컨슈머 그룹 목록 조회에서 제외된다.

    마무리

    이번 글은 카프카 설정과 로그를 통해서 컨슈머 그룹이 언제 사라지는지 확인해봤습니다. 혹시 추가할 내용이나 수정될 내용이 있다면 언제든지 댓글을 통해서 알려주시길 바랍니다. :)

    반응형

    댓글

Designed by Tistory.