ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka 개발] kafka-connect-datagen 커넥터로 테스트 데이터 생성하기
    개발자 라이프/카프카 2021. 1. 24. 23:09
    반응형

    0. 들어가며

     우리는 카프카 토픽을 컨슘(구독)하여 다양한 처리를 할 수 있습니다. 하지만 초기 개발 과정에는 토픽에 메시지가 적재되어 있지 않을 수도 있고, 또한 일시적으로 적재되어있다고 하더라도 메시지가 retention 기간에 따라 삭제/압축되기 때문에 지속적인 개발/테스트가 어려울 수 있습니다. 이번 글은 이러한 케이스에서 유용하게 사용할 수 있는 kafka-connect-dategen 커넥터에 대해 소개합니다.

    1. kafka-connect-datagen 커넥터

     kafka-connect-datagen(이하 datagen) 커넥터는 사용자가 설정한 명세에 따라 데이터를 생성하여 타겟 토픽에 데이터를 밀어 넣어 주는 커넥터입니다. 그렇기 때문에 토픽을 구독하여 처리하는 컨슈머, 스트림즈, 싱크 커넥터 등의 개발/테스트 과정에서 유용하게 활용할 수 있습니다. 참고로 datagen은 컨플런트 사가 중심으로 개발되고 있는 오픈소스(Apache 2.0)입니다. 

    2. datagen 사용해보기 

     datagen을 빌드하고 이를 커넥트 클러스터에 추가하여 사용해봅니다. 예제는 도커 환경을 사용하며, 자세한 환경 구성은 링크를 통해 확인하시길 바랍니다. 

    2.1. datagen 빌드

     datagen 은 컨플런트 사가 Confluent Platform을 통해 배포하고 있습니다. 하지만 Confluent Platform 은 상용 제품이고 보편적으로 사용하는 것에 한계가 있습니다. 그렇기 때문에 이 글은 datagen 소스를 가져와 빌드하고, 이를 사용합니다. 만약 Confluent Platform을 이용하는 방법을 알고 싶다면 링크를 통해 확인하시길 바랍니다.

     먼저 datagen 소스를 로컬로 다운로드 받습니다. 그 후 pom.xml 에 fat jar 생성을 위한 아래의 스크립트를 추가해줍니다. 

    <project ...>
      ...
      <build>
        ...
        <plugins>
          ...
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.1.0</version>
    
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
              <finalName>${project.build.finalName}</finalName>
              <appendAssemblyId>false</appendAssemblyId>
            </configuration>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </project>  

    생성된 jar를 커넥트 컨테이너의 클래스 패스에 추가해야 합니다. 그러기 위해 커넥트 볼륨과 마운트 되어 있는 호스트 위치에 jar를 복사하고, 커넥트 볼륨 위치를 plugin.path 설정에 추가해줍니다(예제 코드에는 추가되어 있습니다.). datagen jar를 추가한 뒤 컨테이너를 재시작해야 커넥트 클래스 패스에 추가되며, 정상적으로 추가되었는지는 다음과 같이 `curl {connect_host}/connector-plugins` 요청을 통해 확인할 수 있습니다.

    2.2. 스키마 명세 작성하기

     datagen 을 이용하여 데이터를 생성하기 위해선 생성할 데이터에 대한 스키마 명세를 먼저 생성해야 합니다. 스키마 명세는 datagen에서 실제로 데이터를 생성하는 Avro random generator(이하 arg) 가 인식할 수 있는 명세여야 합니다. 

     arg 는 Json 포맷으로 작성된 스키마 명세를 읽어 데이터를 생성합니다. 지원하는 스키마는 primitive 타입부터 complex 타입까지 다양한 타입을 지원하며, 정규식에 맞는 임의의 문자열 생성, 범위에 따른 임의의 숫자 생성, 몇 가지 옵션 중 임의의 값 선택 등 다양한 방식으로 임의의 데이터를 생성할 수 있습니다.

    예제에선 아래와 같은 "맥도날드 버거 주문 메시지" 상황을 가정하여 스키마 명세를 작성합니다.

    • 메뉴 이름(menu_name)은 문자열 타입으로, 미리 지정된 5가지 메뉴 중 하나가 선택된다.
    • 수량(quantity)은 정수형 타입으로, 1개 이상 10개 미만으로 주문된다.
    • 주문 시 요청사항(order_requests)은 50자 내의 문자열 타입이다.

     위에 대하여 작성한 스키마 명세는 아래와 같습니다.

    {
        "name": "mcdonald_order",
        "type": "record",
        "fields":
        [
            {
                "name": "menu_name",
                "type": {
                    "type": "string",
                    "arg.properties": {
                        "options": [
                            "Big Mac",
                            "McSpicy Shanghai Burger",
                            "Bulgogi Burger",
                            "1955 Burger",
                            "Bacon Tomato Deluxe"
                        ]
                    }
                }
            },
            {
                "name": "quantity",
                "type": {
                    "type": "int",
                    "arg.properties": {
                        "range": {
                            "min": 1,
                            "max": 10
                        }
                    }
                }
            },
            {
                "name": "order_requests",
                "type": {
                    "type": "string",
                    "arg.properties": {
                        "regex": "[a-zA-Z]*",
                        "length": 50
                    }
                }
            }
        ]
    }

    2.3. datagen 커넥터 배포하기

     작성된 스키마 명세를 이용하여 datagen 커넥터는 배포합니다. datagen의 주요 설정은 아래와 같습니다.

    Parameter Description Default
    kafka.topic 생성한 데이터를 적재할 토픽 이름  
    max.interval 데이터 생성 간 최대 사이 시간(ms) 500
    iterations 각 task에서 생성할 최대 메시지 건 수
    만약 음수라면 무한 생성
     
    schema.string Json 형태의 Avro 스키마 문자열
    schema.filename과 quickstart 설정과 함께 사용할 수 없습니다.
     
    schema.filename Avro 스키마 문자열을 저장한 파일 이름
    schema.string과 quickstart 설정과 함께 사용할 수 없습니다.
     
    quickstart 사용할 미리 정의된 스키마 이름
    schema.string과 schema.filename 설정과 함께 사용할 수 없습니다.
     
    schema.keyfield 정의된 스키마 명세에서 메시지의 키(key)로 사용할 필드 이름  

     예제는 스키마 명세를 직접 설정에 포함하여 배포하기 위해 schema.string 설정을 사용합니다. 따라서, Rest API에 Json 문자열을 포함하기 위해 스키마 명세를 escape 처리해줍니다. 링크를 통해서 처리하면 간단합니다. 그리고 escape 처리된 값을 이용하여 아래와 같이 http 요청을 보냅니다.

    POST {{endpoint}}/connectors HTTP/1.1
    Content-Type: application/json
    
    {
      "name": "{{connector-name}}",
      "config": {
        "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
        "kafka.topic": "{{sink-topic}}",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "schema.string": "{\r\n    \"name\": \"mcdonald_order\",\r\n    \"type\": \"record\",\r\n    \"fields\":\r\n    [\r\n        {\r\n            \"name\": \"menu_name\",\r\n            \"type\": {\r\n                \"type\": \"string\",\r\n                \"arg.properties\": {\r\n                    \"options\": [\r\n                        \"Big Mac\",\r\n                        \"McSpicy Shanghai Burger\",\r\n                        \"Bulgogi Burger\",\r\n                        \"1955 Burger\",\r\n                        \"Bacon Tomato Deluxe\"\r\n                    ]\r\n                }\r\n            }\r\n        },\r\n        {\r\n            \"name\": \"quantity\",\r\n            \"type\": {\r\n                \"type\": \"int\",\r\n                \"arg.properties\": {\r\n                    \"range\": {\r\n                        \"min\": 1,\r\n                        \"max\": 10\r\n                    }\r\n                }\r\n            }\r\n        },\r\n        {\r\n            \"name\": \"order_requests\",\r\n            \"type\": {\r\n                \"type\": \"string\",\r\n                \"arg.properties\": {\r\n                    \"regex\": \"[a-zA-Z]*\",\r\n                    \"length\": 50\r\n                }\r\n            }\r\n        }\r\n    ]\r\n}",
        "schema.keyfield": "menu_name",
        "max.interval": 1000,
        "iterations": 10000000,
        "tasks.max": "1"
      }
    }

    2.4. 메시지 확인하기

     배포된 datagen이 정상적으로 메시지를 생성하는지 콘솔 컨슈머를 이용하여 확인합니다.

    정상적으로 메시지가 생성되고 있다.

    3. 마무리

     이번 글을 통해 datagen 을 알아보고 직접 사용해봤습니다. datagen 은 글에서 확인했었듯이 다양하고 복잡한 임의 데이터를 생성할 수 있습니다. 이는 카프카 토픽을 이용한 개발/테스트를 더욱 독립적이고 민첩하게 할 수 있게 할 것입니다. 

    반응형

    댓글

Designed by Tistory.