-
[Kafka 101] KSQL (KSqlDB)개발자 라이프/카프카 2020. 6. 21. 21:07반응형
들어가며
카프카는 브로커를 중심으로 다양한 데이터 파이프라인이 구성될 수 있습니다. 그중 이전 글에서 다루었던 카프카 스트림즈는 토픽 간 Sub/Pub을 통해 브로커 내부 파이프라인을 구축할 수 있도록 합니다. 이번 KSQL 또한 카프카 스트림즈처럼 브로커 내부 파이프라인을 구축하는 데에 특화된 Confluent 솔루션입니다. 이번 글에서는 KSQL 이 무엇인지 살펴보고, 구성과 활용에 대해 간단히 소개하겠습니다.
KSQL
카프카는 내부 토픽을 Sub하여 분석하거나, Sub 한 토픽을 정제하여 다시 토픽으로 Pub 할 수 있습니다. 그렇다면 이러한 파이프라인을 구축할 때, 어떤 방법을 고려할 수 있을까요? 카프카에서는 다음 3가지 방법을 고려할 수 있습니다.
- Kafka Client를 이용한 Consumer/Producer 직접 구현 배포
- Kafka Streams API 라이브러리를 이용한 어플리케이션 구현 배포
- KSQL 문으로 로직 구현하고 KSQL 서버에 배포
이전 글까지 읽어보셨던 분들이라면 1, 2번에 대해서는 익숙하실겁니다. 1번 방식은 추상화가 가장 낮은 단계이고, 1번 방식을 라이브러리 형태로 추상화한 것이 2번 스트림즈 방식입니다. 3번 방식인 KSQL은 2번 스트림즈 방식을 더욱 추상화한 형태입니다. 즉, KSQL은 1번, 2번 방식보다 사용자들이 쉽게 사용할 수 있다는 것인데, 그 이유는 다음 특징을 가지기 때문입니다.
- 유사 SQL(KSQL)문을 이용한 로직 구현
- 구현된 KSQL문을 KSQL 엔진에 배포
일단, 가장 큰 특징으로 KSQL은 유사 SQL 문을 이용하여 로직을 구현합니다. 그렇기 때문에 API 등 프로그래밍 지식이 많이 요구되지 않습니다. 그리고 구현된 KSQL 문을 기존에 배포되어 구동되고 있는 KSQL 엔진(서버)에 배포하면, 엔진 내에서 로직에 맞춰 구동됩니다. 그렇기 때문에 사용자가 직접 서버 배포까지 고려할 필요가 없게 됩니다.
이처럼 KSQL은 기존 카프카 스트림즈에서 발전하여 사용자가 더욱 손쉽게 데이터 파이프라인을 구축할 수 있도록 도와줍니다. 이러한 특징 때문에 카프카 토픽에 대한 데이터 분석에 자주 고려됩니다.
KSQL 구성
KSQL은 크게 2가지로 구성됩니다. 첫번째로 SQL로 요청된 로직을 풀어 구동시키는 KSQL 엔진(서버) 부분, 두 번째로 KSQL 엔진에 요청을 전달하는 KSQL 클라이언트 부분이 있습니다. 이는 우리가 흔히 접할 수 있는 RDBMS 구성과 유사합니다.
KSQL 클라이언트를 조금 더 살펴보면, KSQL 클라이언트는 사용자의 요청을 KSQL 엔진에 전달할 수 있게 CLI 방식과 REST API 방식을 제공합니다. (물론 위 그림에서 UI를 이용한 접근을 알려주고 있지만, 해당 Confluent Control Center는 Confluent의 상용 제품으로 제외했습니다.) CLI 인터페이스를 이용하면 더욱 인터렉티브한 조작이 가능하기 때문에, 되도록이면 CLI 환경에서 KSQL 작업하시는 것을 추천드립니다.
KSQL 활용
우리는 KSQL은 SQL 형식으로 로직을 구현하고, 또 CLI 환경 혹은 REST API를 이용하여 구현된 로직을 전달할 수 있다는 것을 알았습니다. 그러면 이 KSQL을 이용하여 어떤 파이프라인을 구성할 수 있을까요? 그전에 우리는 2가지 개념을 살펴봐야 합니다. 바로 스트림(Stream)과 테이블(Table)입니다.
스트림과 테이블
파이프라인을 통해 데이터는 지속적으로 발생하고 흐릅니다. 이를 스트림이라고 하고, 이 스트림의 상태 정보(State)를 모아놓은 것이 테이블입니다. 따라서 스트림과 테이블의 상관관계를 정리하면 다음과 같습니다.
- 테이블은 스트림의 상태 정보의 축적
- 스트림은 테이블의 변경 로그(Changelog)
여기서 중요한 것은 토픽으로 스트림되는 데이터들을 일정 기준을 통해 상태 정보를 생성하고 관리할 수 있다는 것입니다.
KSQL 예시 - 5분 간격으로 상태 정보 조회하기
KSQL을 활용하는 방법은 다양합니다. SQL 문을 이용하여 토픽 간 조인을 하거나, 시간 값을 기준으로 나누어(타임 윈도잉, Time Windowing) 상태 정보를 지속적으로 조회할 수 있습니다. 간단한 예로 5분 간격으로 특정 필드를 기준으로 그룹핑하여 상태 정보를 생성하고, 조회해보도록 하겠습니다. 일단 다음과 같은 토픽(mcdonald-orders)이 있다고 가정해보겠습니다.
- menu : 메뉴 이름
- price : 가격
- quantity : 수량
위 토픽을 이용하여 먼저 스트림을 생성합니다.
CREATE STREAM mcdonald-orders (menu VARCHAR, price INTEGER, quantity INTEGER) WITH (kafka_topic='mcdonald-orders', value_format='avro');
생성된 스트림에 대하여 5분 간격으로 메뉴 별 총 주문 수량을 집계하는 테이블을 생성합니다.
CREATE TABLE MENU_COUNTS AS SELECT MENU, SUM(QUANTITY) AS ORDER_COUNT FROM MCDONALD_ORDERS WINDOW TUMBLING (SIZE 5 MINUTES) GROUP BY MENU EMIT CHANGES;
위와 같이 테이블을 생성하면 다음과 같은 테이블 업데이트가 지속적으로 이뤄집니다.
특정 구간에 대한 Bulgogi Burger 의 총 주문량을 조회하면 다음과 같습니다.
마무리
KSQL은 SQL 문을 이용하여 내부 토픽에 대한 조회, 집계 등의 연산을 쉽고 간편하게 도와주는 유용한 솔루션입니다. Confluent 사에서 제공하지만 Community license로 사용에는 별도의 제약 조건이 없습니다. 만약 전사적으로 카프카를 이용하고 있고, 내부적인 데이터 분석이 필요하다면 KSQL을 충분히 고려해 볼 수 있습니다.
이로써 이번 글을 통해 KSQL에 대해 간단히 알아보고, 구성과 활용법에 대해 살펴봤습니다. 참고로 KSQL이 Confluent 5.5 버전부터 KsqlDB 라는 이름으로 변경되어 개발되고 있습니다. 기존 카프카가 한계적으로 가지고 있던 저장소(DB)로써의 부분을 더욱 효과적으로 보완하고자 하는 의지가 보이는 부분입니다.
ps 1. 혹시 잘못되거나 부족한 부분은 댓글로 남겨주시길 바랍니다.
ps 2. 내용이 마음에 드셨다면 공감 버튼❤️을 눌러주세요!반응형