catsridingCATSRIDING|OCEANWAVES
Dev

Apache Kafka 기반 이벤트 스트리밍 로또 번호 생성 서비스 개발하기

jynn@catsriding.com
Dec 24, 2024
Published byJynn
999
Apache Kafka 기반 이벤트 스트리밍 로또 번호 생성 서비스 개발하기

Building Event Streaming Lottery Service with Apache Kafka

Apache Kafka(아파치 카프카)는 분산형 이벤트 스트리밍 플랫폼으로, 대규모 데이터를 처리하고 실시간 스트리밍 데이터 파이프라인을 구성하는 데 적합한 도구입니다. 이벤트 기반 아키텍처는 시스템 간 데이터를 비동기적으로 전달하고 처리하여 확장성과 유연성을 극대화합니다.

이 프로젝트는 이벤트 기반 아키텍처의 원리와 비동기 이벤트 발행 및 처리 과정을 이해하기 위한 목적으로, Apache Kafka를 핵심 메시지 브로커로 활용하여 로또 번호 생성 서비스를 구현했습니다. 이 글은 서비스를 구현하는 전반적인 과정을 담고 있습니다.

Prerequisites

이 프로젝트는 데이터베이스와 Redis를 필수 구성 요소로 활용합니다. 아래와 같은 기본 구성을 바탕으로 Spring Boot 프로젝트를 초기화하며, 이후 단계에서 Apache Kafka를 포함한 메시징 시스템을 추가로 통합할 계획입니다.

다음은 프로젝트를 설정하기 위해 추가한 초기 주요 의존성 목록입니다:

build.gradle
dependencies {
    // Querydsl
    implementation 'com.querydsl:querydsl-jpa:5.0.0:jakarta'
    annotationProcessor "com.querydsl:querydsl-apt:${dependencyManagement.importedProperties['querydsl.version']}:jakarta"
    annotationProcessor "jakarta.annotation:jakarta.annotation-api"
    annotationProcessor "jakarta.persistence:jakarta.persistence-api"
    
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation 'org.springframework.boot:spring-boot-starter-data-redis'
    implementation 'org.springframework.boot:spring-boot-starter-validation'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    developmentOnly 'org.springframework.boot:spring-boot-devtools'
    runtimeOnly 'org.postgresql:postgresql'
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

이 프로젝트에서 사용된 주요 기술 스택과 도구는 다음과 같습니다:

  •  Java 21
  •  Spring Boot 3.3.2
  •  Gradle
  •  Postgresql
  •  Redis
  •  Docker
  • 󱀏 Apache Kafka

Structuring Process

우선, 프로젝트의 구조를 명확히 하고 로또 복권 번호 발급 서비스의 프로세스를 설계해야 합니다. 이를 단계별로 차근차근 진행해 보겠습니다.

가장 먼저, 클라이언트가 서비스 페이지에 접속하여 복권 발급하기 버튼을 클릭하면 요청이 시작됩니다.

building-event-streaming-lottery-service-with-apache-kafka_02.png

티켓 발급 API가 요청되면, 서버에서 다음 작업들이 수행됩니다:

building-event-streaming-lottery-service-with-apache-kafka_01.png

  • 유니크 ID 생성: 특정 알고리즘을 사용하여 고유한 키(Key)를 생성합니다. 이 키는 이벤트, Redis, 데이터베이스에서 복권을 식별하는 데 활용됩니다.
  • Kafka Producer 이벤트 발행: 생성된 키를 기반으로 Kafka Producer(카프카 프로듀서)가 지정된 토픽(Topic)으로 이벤트를 발행합니다.
  • Redis 데이터 추가: Redis에 해당 키를 기반으로 데이터를 저장합니다. 이는 클라이언트의 폴링 요청에 빠르게 응답하기 위해 데이터베이스를 직접 조회하는 대신 Redis에서 처리 결과를 관리합니다. 데이터 정합성이 깨질 수 있는 위험이 있지만, 지금 단계에서 예외 상황까지 고려하지는 않을 생각입니다. 데이터는 최종적으로 클라이언트가 수신할 복권 데이터 형식으로 저장되며, 상태값을 포함해 처리 결과를 효율적으로 추적할 수 있도록 합니다.
  • 요청 수락 결과 반환: 위 과정이 모두 성공적으로 처리되면, 클라이언트에 상태코드 202 Accepted를 반환합니다.

이후 클라이언트는 상태코드 202을 수신한 뒤, 주기적으로 티켓 발급 결과 조회 API를 폴링 방식으로 호출합니다. 한편, 해당 토픽을 구독하고 있던 Kafka Consumer(카프카 컨슈머)로 인해 다음과 같은 작업이 처리됩니다:

building-event-streaming-lottery-service-with-apache-kafka_03.png

  • Kafka Consumer 이벤트 처리: 해당 토픽을 구독하여 발급 요청 이벤트를 수신합니다.
  • 서비스 로직 호출: 이벤트에 따라 로또 번호를 생성하는 비즈니스 로직을 실행합니다.
  • 복권 데이터 생성 및 저장: 생성된 복권 데이터를 데이터베이스에 저장합니다.
  • Redis 데이터 갱신: Redis에 저장된 데이터의 상태를 업데이트합니다.

Redis에 저장된 데이터는 처리 결과 상태를 포함하며, 클라이언트는 완료 상태로 변경될 때까지 폴링을 지속합니다. 상태가 변경되면 폴링을 중단하고 결과 데이터를 처리합니다.

building-event-streaming-lottery-service-with-apache-kafka_04.png

  • 클라이언트 데이터 페치: 주기적으로 API를 호출하여 발급 결과를 확인합니다.
  • 결과 처리: 발급이 완료되면 클라이언트는 폴링을 중단하고 결과 화면을 렌더링합니다.

정상적으로 티켓이 발급된 경우, 클라이언트 화면에는 발급된 복권 번호와 관련된 정보가 표시됩니다:

building-event-streaming-lottery-service-with-apache-kafka_05.png

프로세스를 구성하는 과정에서 시스템의 복잡함을 느꼈지만, 단계를 세분화하고 차근차근 구현하니 서버 측은 비교적 수월하게 완성할 수 있었습니다. 다음은 데이터베이스를 준비해보도록 하겠습니다.

Designing the Database

복잡성을 줄이고 초기 구현을 간소화하기 위해 이 서비스에서는 한 번에 한 게임만 발행할 수 있도록 제한하였습니다. 이러한 요구사항을 기반으로 다음과 같이 테이블을 설계하였습니다:

building-event-streaming-lottery-service-with-apache-kafka_07.png

  • 회차 관리: lottery_draws
    • 복권 회차 정보와 상태를 관리하기 위해 설계되었습니다.
    • 각 당첨 번호가 1~45 범위를 벗어나지 않도록 설정했습니다.
    • 데이터 정합성을 유지하며, 각 회차의 고유성을 보장하기 위해 회차 컬럼은 유니크로 설정했습니다.
  • 발급 관리: lottery_tickets
    • 사용자별 복권 발급 정보를 저장하기 위한 테이블로, 이메일, 발급 시점, 참여한 회차 정보를 포함합니다.
    • 회차 테이블과 외래키로 연결되어 각 발급이 특정 회차에 속하도록 보장했습니다.
  • 번호 관리: lottery_numbers
    • 각 복권 티켓에 선택된 번호를 저장하며, 1~45 범위로 제한되는 제약 조건을 적용했습니다.
    • 발급 테이블과의 관계를 통해 각 티켓의 번호 구성을 관리합니다.
  • 당첨 관리: lottery_winnings
    • 각 티켓의 당첨 결과를 저장하기 위해 설계되었습니다.
    • 맞춘 번호 개수와 등수를 기록하며, 각각의 값에 대한 유효성을 제약 조건으로 보장했습니다.
    • 티켓 테이블 및 회차 테이블과 외래키로 연결되어 데이터 추적성을 높였습니다.

아래 DDL 스키마는 이러한 설계 과정을 반영합니다:

create table lottery_draws
(
    id                    bigint primary key,                                 -- 회차 ID
    draw_number           int unique  not null,                               -- 복권 회차
    draw_date             date        not null,                               -- 복권 추첨일
    state                 varchar(20) not null default 'PENDING',             -- 복권 발급 가능 상태
    winning_number_first  int check (winning_number_first between 1 and 45),  -- 당첨 번호 1
    winning_number_second int check (winning_number_second between 1 and 45), -- 당첨 번호 2
    winning_number_third  int check (winning_number_third between 1 and 45),  -- 당첨 번호 3
    winning_number_fourth int check (winning_number_fourth between 1 and 45), -- 당첨 번호 4
    winning_number_fifth  int check (winning_number_fifth between 1 and 45),  -- 당첨 번호 5
    winning_number_sixth  int check (winning_number_sixth between 1 and 45),  -- 당첨 번호 6
    winning_number_bonus  int check (winning_number_bonus between 1 and 45)   -- 보너스 번호
);

create table lottery_tickets
(
    id          bigint primary key,                              -- 발급 ID
    draw_id     bigint       not null,                           -- 회차 ID (Foreign Key)
    email       varchar(100) not null,                           -- 발급자 이메일
    ip_address  varchar(50)  not null,                           -- 발급자 IP 주소
    issued_date timestamp    not null default current_timestamp, -- 발급 날짜
    foreign key (draw_id) references lottery_draws (id) on delete cascade
);

create table lottery_numbers
(
    id        bigint primary key,                              -- 번호 ID
    ticket_id bigint not null,                                 -- 발급 ID (Foreign Key)
    number    int    not null check (number between 1 and 45), -- 로또 번호 (1~45)
    foreign key (ticket_id) references lottery_tickets (id) on delete cascade
);

create table lottery_winnings
(
    id            bigint primary key,                                    -- 결과 ID
    ticket_id     bigint not null,                                       -- 티켓 ID (Foreign Key)
    draw_id       bigint not null,                                       -- 회차 ID (Foreign Key)
    matched_count int    not null check (matched_count between 0 and 7), -- 맞춘 번호 개수
    rank          int    not null check (rank between 1 and 6),          -- 당첨 등수 (1~6)
    foreign key (ticket_id) references lottery_tickets (id) on delete cascade,
    foreign key (draw_id) references lottery_draws (id) on delete cascade
);

Setting Up Apache Kafka on macOS

이번 섹션에서는 Kafka 기반의 이벤트 스트리밍 서비스를 개발하기 위해 Docker를 활용하여 로컬 환경에 Kafka를 구축해 보겠습니다.

Apache Kafka를 설치하는 방법은 다양한 방식이 있지만, 저는 여기서 Confluent를 사용했습니다. Confluent는 Apache Kafka를 만든 개발자들이 설립한 회사에서 제공하는 플랫폼으로, Kafka에 모니터링 및 관련 도구를 쉽게 연동할 수 있는 기능을 제공합니다.

Configuring Kafka with Docker Compose

Kafka는 데이터 발행과 구독을 처리하는 핵심 역할을 맡으며, Zookeeper는 Kafka의 메타데이터와 클러스터 상태를 관리하는 데 필수적인 구성 요소입니다. 따라서 Kafka를 정상적으로 운영하려면 Zookeeper와 함께 실행해야 합니다. 이러한 실행 환경을 효율적으로 구성하기 위해 Docker Compose를 활용하겠습니다:

docker-compose.yml
version: '3'

services:

  zookeeper:
    image: confluentinc/cp-zookeeper
    container_name: local-zookeeper
    restart: unless-stopped
    environment:
      ZOOKEEPER_CLIENT_PORT: 2182
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2182:2182"

  kafka:
    image: confluentinc/cp-kafka
    container_name: local-kafka
    restart: unless-stopped
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2182
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9093
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9093:9093"

Starting Kafka Services

docker-compose.yml 파일을 프로젝트 루트 디렉터리에 저장한 후, 아래 명령어를 실행해 Kafka와 Zookeeper 서비스를 시작합니다:

$ docker-compose up -d

Creating Kafka Topic

Kafka 환경이 올바르게 동작하는지 확인하려면 Kafka CLI를 사용해 간단한 작업을 실행해볼 수 있습니다. 예를 들어, 이번 프로젝트에서 사용할 토픽을 생성하여 Kafka가 정상적으로 메시지를 처리할 준비가 되었는지 확인할 수 있습니다. 이를 위해 먼저 Kafka 컨테이너에 접속해야 합니다:

$ docker exec -it local-kafka /bin/bash

컨테이너 내부에서 아래 명령어를 사용해 새로운 토픽을 생성합니다:

$ kafka-topics --bootstrap-server localhost:9093 --create --topic lottery-issuance-topic --partitions 4
  • kafka-topics: Kafka에서 토픽을 관리하기 위한 CLI 도구입니다. 토픽 생성, 삭제, 조회 등의 작업을 수행할 수 있습니다.
  • --bootstrap-server: Kafka 클러스터에 연결할 브로커의 주소를 지정합니다.
  • --create: 지정된 이름과 설정으로 새로운 Kafka 토픽을 생성합니다.
  • --partitions: 토픽의 파티션 개수를 지정합니다.

Verifying Message Consumption

생성된 토픽에 데이터를 소비하는지 확인하려면 아래 명령어를 실행합니다:

$ kafka-console-consumer --bootstrap-server localhost:9093 --topic lottery-issuance-topic --partition 0 --from-beginning

위 명령어를 실행하면 지정된 토픽과 파티션에서 메시지를 소비하는 Kafka Consumer가 활성화됩니다. 현재 창을 유지한 상태로 새로운 터미널에서 Kafka Producer를 실행하여 메시지를 발송할 수 있습니다:

$ kafka-console-producer --bootstrap-server localhost:9093 --topic lottery-issuance-topic

Producer에서 메시지를 입력하면 Consumer에서 해당 메시지가 표시됩니다.

Kafka Consumer 확인

이 과정을 통해 Kafka가 정상적으로 작동하며, Producer와 Consumer 간 데이터가 토픽을 통해 올바르게 전송되고 있음을 확인할 수 있습니다.

Implementing Lottery Service with Kafka

지금까지 준비한 환경을 바탕으로 이제 로또 번호 생성 서비스를 구현해보겠습니다.

Kafka Integration with Spring

Kafka와 Spring Boot를 연동하기 위해 필요한 의존성을 추가하고 설정 파일을 구성합니다. 이 단계에서는 Kafka 브로커와의 통신을 설정하고, 메시지를 주고받기 위한 기본 직렬화/역직렬화 도구를 지정합니다.

Spring Kafka 의존성 추가

먼저, build.gradle 파일에 Spring Kafka 의존성을 추가합니다:

build.gradle
dependencies {
    // Kafka
    implementation 'org.springframework.kafka:spring-kafka'
}

이 의존성은 Kafka 클라이언트와 Spring Boot 간의 통합을 지원하며, 메시지 발행 및 소비를 위한 다양한 유틸리티를 제공합니다.

Kafka 설정 추가

application.yml 파일에 Kafka 브로커와 통신하기 위한 설정을 추가합니다. 여기에서는 프로듀서와 컨슈머 모두 문자열 기반 메시지를 처리하도록 설정하였습니다:

application.yml
spring:
  kafka:
    bootstrap-servers: localhost:9093
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
KafkaTemplate Bean 구성

KafkaTemplate은 Spring Kafka에서 제공하는 핵심 클래스 중 하나로, Kafka 메시지를 발행하는 역할을 담당합니다. 이를 활용하면 애플리케이션 코드에서 Kafka Producer와 상호작용하는 과정을 단순화할 수 있습니다.

  1. 메시지 발행의 단순화: KafkaTemplate은 Kafka 브로커와의 연결, 메시지 직렬화, 파티션 분배 등 복잡한 과정을 추상화하여, 개발자가 코드에서 간단한 메서드 호출로 메시지를 발행할 수 있도록 지원합니다.
  2. 비동기 메시지 발행: 메시지를 비동기적으로 발행하며, ListenableFuture 객체를 반환하여 발행 성공 또는 실패 시 콜백 처리를 지원합니다.
  3. 유연한 메시지 포맷 지원: 메시지의 키와 값을 다양한 데이터 타입으로 설정할 수 있으며, 필요에 따라 커스텀 직렬화기를 적용할 수도 있습니다.
  4. 송신 결과 추적: 메시지가 성공적으로 발행되었는지 또는 실패했는지를 확인할 수 있는 메서드를 제공하여, 애플리케이션의 안정성과 디버깅을 돕습니다.

KafkaTemplate을 Bean으로 등록하면, Spring 컨텍스트 내에서 이를 주입받아 어디서든 쉽게 사용할 수 있습니다:

KafkaConfig.java
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }
}
  • ProducerFactory: Kafka Producer의 설정을 관리하며, KafkaTemplate에 Producer 객체를 제공하는 역할을 합니다.

Publishing Messages

클라이언트의 티켓 발급 요청을 처리하고 Kafka에 메시지를 발행하기 위해, REST API와 서비스 레이어를 구성합니다. REST API는 클라이언트로부터 요청을 수신한 뒤 이를 메시지로 변환하여 Kafka에 전달하며, 요청이 성공적으로 수락되었음을 나타내는 응답을 반환합니다.

building-event-streaming-lottery-service-with-apache-kafka_01.png

전체 프로세스는 다음과 같은 흐름으로 동작합니다:

  1. 클라이언트 요청 수신: 클라이언트가 REST API를 호출하여 티켓 발급을 요청합니다.
  2. 메시지 변환 및 생성: API는 요청 데이터를 메시지 형태로 변환하여 발행 준비를 마칩니다.
  3. Kafka로 메시지 발행: 변환된 메시지는 Kafka Producer를 통해 지정된 토픽으로 발행됩니다.
  4. 응답 반환: 요청이 성공적으로 처리되었음을 알리는 202 Accepted 상태 코드와 함께 응답을 반환합니다.

이 과정을 통해 Kafka는 비동기적으로 요청을 처리하며, 클라이언트는 요청이 정상적으로 수락되었음을 빠르게 확인할 수 있습니다.

컨트롤러는 클라이언트 요청을 수신한 후, 클라이언트의 IP 주소를 추출해 요청 데이터에 추가하고, 이를 서비스 레이어에서 처리하도록 전달한 뒤, 결과를 HTTP 응답으로 반환합니다:

LotteryCommandController.java
@Slf4j
@RequiredArgsConstructor
@RequestMapping("/lotteries")
@ApiAdapter
public class LotteryCommandController {

    private final ResolveClientIp resolveClientIp;

    @PostMapping
    public ResponseEntity<?> issueLotteryTicketApi(HttpServletRequest servletRequest) {
        String clientIp = resolveClientIp.resolveIp(servletRequest);
        LotteryTicketResponse response = lotteryTicketIssuanceUseCase.acceptLotteryIssuance(request.toCommand(clientIp));
        return ResponseEntity.accepted().body(compose(ACCEPTED, response));
    }
}

서비스 레이어는 클라이언트 요청 데이터 처리, Kafka 메시지 발행, Redis 상태 저장을 포함한 핵심 비즈니스 로직을 처리합니다:

LotteryService.java
@Override
public LotteryTicketResponse acceptLotteryIssuance(IssueLotteryTicketCommand command) {
    LotteryTicketId ticketId = Lottery.generateTicketId();
    LotteryIssuanceMessage message = LotteryIssuanceMessage.write(command, ticketId);

    try {
        String serialized = objectMapper.writeValueAsString(message);
        publishLotteryIssuancePort.publish(ticketId.id().toString(), serialized);
        storeLotteryIssuancePort.store(ticketId.id().toString(), serialized, 24 * 60 * 60L);
        log.info("acceptLotteryIssuance: Successfully published lottery issuance - message={}", message);
    } catch (JsonProcessingException e) {
        log.error("acceptLotteryIssuance: {}", e.getMessage(), e);
        throw new LotteryIssuanceRequestFailureException("Cannot serialize message");
    }

    return LotteryTicketResponse.requestAccepted(message);
}
  1. 유니크 ID 생성: 각 요청에 대해 고유한 ID를 생성합니다. 이 ID는 요청 데이터를 추적하고 상태를 관리하기 위한 기본 키로 활용됩니다.
  2. Kafka로 메시지 발행: 요청 데이터를 기반으로 Kafka 메시지를 생성하고, 이를 JSON 형식으로 직렬화한 뒤 Kafka Producer를 통해 지정된 토픽으로 발행합니다.
  3. Redis 캐싱: 티켓 발급 상태를 Redis에 저장하여 데이터베이스 부하를 줄이고, 클라이언트가 신속하게 상태를 확인할 수 있도록 합니다.

Kafka 메시지 발행 로직은 서비스 레이어에서 직접 처리하지 않고, 별도의 어댑터 클래스로 구현하여 역할을 분리하였습니다.

Kafka 메시지 발행 어댑터는 KafkaTemplate을 사용해 메시지를 발행하며, 생성된 키와 메시지를 Kafka 토픽으로 전달합니다. 복권 번호 발급 기능은 요청 순서 보장이 필요 없으므로, 임의 키를 통한 메시지 분배 방식으로도 충분히 적합합니다. 이는 메시지를 파티션에 고르게 분산시켜 성능을 최적화하는 데 효과적입니다.

LotteryKafkaProducerAdapter.java
@Slf4j
@RequiredArgsConstructor
@MessagingAdapter
public class LotteryKafkaProducerAdapter implements PublishLotteryIssuancePort {

    private final KafkaTemplate<String, String> kafkaTemplate;

    @Override
    public void publish(String key, String message) {
        kafkaTemplate.send("lottery-issuance-topic", key, message);
        log.info("publish: Successfully published lottery issuance to Kafka - key={}", key);
    }
}

Consuming Messages

이어서 Kafka Producer에서 발행한 메시지를 처리하기 위해 Consumer를 구현합니다. Consumer는 Kafka 토픽에서 수신한 메시지를 처리 포트로 전달하여 로또 번호를 생성하고, 이를 데이터베이스에 저장하며 Redis를 업데이트합니다. 이 과정은 발급 상태를 클라이언트가 빠르게 조회할 수 있도록 지원합니다.

building-event-streaming-lottery-service-with-apache-kafka_03.png

Kafka 메시지는 Kafka Consumer를 통해 브로커에서 수신됩니다. Spring Kafka에서 제공하는 @KafkaListener는 Consumer 설정을 간소화하여, 지정된 토픽에서 메시지를 구독하고 처리 메서드를 실행하도록 지원합니다. 수신된 메시지는 처리 포트로 위임되어 로직이 실행됩니다.

LotteryKafkaConsumerAdapter.java
@Slf4j
@RequiredArgsConstructor
@MessagingAdapter
public class LotteryKafkaConsumerAdapter {

    private final ProcessLotteryIssuancePort processLotteryIssuancePort;

    @KafkaListener(topics = "lottery-issuance-topic", groupId = "lottery-consumers")
    public void consume(String message) {
        log.info("kafka-consumer: {}", message);
        processLotteryIssuancePort.processLotteryIssuance(message);
    }
}

@KafkaListener의 주요 속성은 다음과 같습니다:

  • topics: 구독할 Kafka 토픽 이름을 지정합니다. 여러 개의 토픽을 구독하려면 배열 형태로 설정 가능합니다.
  • groupId: Consumer가 속할 컨슈머 그룹 ID를 지정합니다. 동일한 그룹에 속한 Consumer들은 메시지를 분산 처리합니다.

Consumer에서 전달받은 메시지는 서비스 레이어에서 처리됩니다. 서비스 레이어는 수신한 메시지를 역직렬화하여 객체로 변환하고, 이를 바탕으로 로또 번호를 생성합니다. 생성된 번호는 데이터베이스에 저장되고, Redis에 캐싱되어 클라이언트가 빠르게 조회할 수 있습니다.

LotteryService.java
@Override
public void processLotteryIssuance(String serialized) {
    try {
        LotteryIssuanceMessage message = objectMapper.readValue(serialized, LotteryIssuanceMessage.class);
        Lottery lottery = loadCurrentLotteryDraw.retrieve();
        LotteryTicket ticket = lottery.issueTicket(message);

        persistLotteryTicketPort.persist(ticket);

        message = LotteryIssuanceMessage.renew(lottery, ticket);
        serialized = objectMapper.writeValueAsString(message);

        storeLotteryIssuancePort.store(ticket.id().id().toString(), serialized, 30 * 60L);

        log.info("processLotteryIssuance: Successfully issued lottery ticket - ticketId={} message={}",
                ticket.id().id(),
                serialized);
    } catch (JsonProcessingException e) {
        log.error("processLotteryIssuance: {}", e.getMessage(), e);
        throw new ProcessingLotteryIssuanceFailureException("Cannot deserialize message");
    }
}

발급 프로세스는 다음 단계를 포함합니다:

  1. 메시지 역직렬화: Kafka에서 수신된 메시지를 LotteryIssuanceMessage 객체로 변환합니다.
  2. 로또 번호 생성: 현재 로또 회차 데이터를 불러와 번호를 생성하고 티켓 객체를 생성합니다.
  3. 데이터베이스 저장: 생성된 티켓 정보를 데이터베이스에 저장하여 영속화합니다.
  4. Redis 데이터 갱신: 발급된 티켓 정보를 Redis에 저장하여 클라이언트가 신속히 조회할 수 있도록 합니다.

발급 프로세스가 완료되면 Redis에 최신 티켓 정보가 반영되며, 클라이언트는 Redis에 저장된 데이터를 통해 실시간으로 처리 상태를 확인할 수 있습니다.

Testing Kafka Messaging Load with k6

Kafka 기반 메시징 시스템은 고성능과 확장성을 제공하며, 다양한 부하 조건에서 안정적으로 작동하는지 검증하는 것이 중요합니다. 간단한 테스트를 통해 로또 발급 요청을 시뮬레이션하며, 다량의 메시지가 발행되는 상황에서 시스템이 요청을 정상적으로 수락하고 처리할 수 있는지 확인해보겠습니다.

lottery-issuance-load-test.js
import http from 'k6/http';
import { check, sleep } from 'k6';

export const options = {
  scenarios: {
    default: {
      executor: 'ramping-vus',
      startVUs: 0,
      gracefulRampDown: '5s',
      stages: [
        { duration: '1s', target: 100 },
        { duration: '1s', target: 500 },
        { duration: '1s', target: 1000 },
      ],
    },
  },
};

export default function () {
  const url = 'http://localhost:8080/waves/lotteries';
  const payload = JSON.stringify({
    email: 'catsriding@gmail.com',
    quantity: 5,
  });

  const params = {
    headers: {
      'Content-Type': 'application/json',
    },
  };

  const res = http.post(url, payload, params);

  check(res, {
    'is status 202': (r) => r.status === 202,
  });

  sleep(1);
}

테스트는 k6를 사용하여 아래와 같은 단계로 진행됩니다:

  1. REST API를 통해 다량의 로또 발급 요청을 시뮬레이션.
  2. 각 요청이 상태 코드 202를 반환하는지 확인.
  3. Kafka 토픽으로 메시지가 성공적으로 발행되었는지 검증.
  4. 시스템이 고부하 상황에서 안정적으로 작동하는지 평가.

스크립트를 작성한 후, 아래 명령어를 사용하여 테스트를 실행합니다:

K6
$ k6 run lottery-issuance-load-test.js

테스트 중 모든 요청이 정상적으로 처리되었으며, Kafka 기반 시스템이 고부하 환경에서도 안정적으로 작동함을 확인했습니다. 다음은 k6 출력 결과입니다:

k6 run lottery-issuance-load-test.js

          /\      |‾‾| /‾‾/   /‾‾/  
     /\  /  \     |  |/  /   /  /   
    /  \/    \    |     (   /   ‾‾\ 
   /          \   |  |\  \ |  ()  |
  / __________ \  |__| \__\ \_____/ .io

     execution: local
        script: lottery-issuance-load-test.js
        output: -

     scenarios: (100.00%) 1 scenario, 1000 max VUs, 33s max duration (incl. graceful stop):
              * default: Up to 1000 looping VUs for 3s over 3 stages (gracefulRampDown: 5s, gracefulStop: 30s)

     ✓ is status 202

     checks.........................: 100.00% ✓ 15970     
     data_received..................: 921 kB  230 kB/s
     data_sent......................: 308 kB  77 kB/s
     http_req_blocked...............: avg=105.95µs min=0s    med=126µs  max=2.59ms p(90)=197µs  p(95)=228µs  
     http_req_connecting............: avg=82.76µs  min=0s    med=96µs   max=2.55ms p(90)=160µs  p(95)=184.19µs
     http_req_duration..............: avg=1.35ms   min=773µs med=1.23ms max=6.71ms p(90)=1.7ms  p(95)=2.17ms  
       { expected_response:true }...: avg=1.35ms   min=773µs med=1.23ms max=6.71ms p(90)=1.7ms  p(95)=2.17ms  
     http_req_failed................: 0.00%   ✓ 01597  
     http_req_receiving.............: avg=107.46µs min=9µs   med=99µs   max=1.88ms p(90)=156µs  p(95)=187µs   
     http_req_sending...............: avg=20.28µs  min=4µs   med=19µs   max=257µs  p(90)=33µs   p(95)=38µs    
     http_req_tls_handshaking.......: avg=0s       min=0s    med=0s     max=0s     p(90)=0s     p(95)=0s      
     http_req_waiting...............: avg=1.22ms   min=714µs med=1.11ms max=6.59ms p(90)=1.58ms p(95)=1.97ms  
     http_reqs......................: 1597    399.163382/s
     iteration_duration.............: avg=1s       min=1s    med=1s     max=1s     p(90)=1s     p(95)=1s      
     iterations.....................: 1597    399.163382/s
     vus............................: 42      min=42       max=978 
     vus_max........................: 1000    min=1000     max=1000

running (04.0s), 0000/1000 VUs, 1597 complete and 0 interrupted iterations
default ✓ [======================================] 0000/1000 VUs  3s

테스트 결과는 다음과 같습니다:

  • 요청 성공률: 100% (1,597건의 요청 모두 상태 코드 202 반환).
  • 평균 응답 시간: 1.35ms.
  • 최대 응답 시간: 6.71ms.
  • HTTP 요청 실패율: 0%.

테스트 중 Kafka Consumer가 요청 메시지를 구독하고 처리했는지 검증하기 위해 데이터베이스를 조회합니다. 아래 SQL 쿼리를 통해 로또 발급 요청으로 생성된 데이터 수를 확인할 수 있습니다:

select count(*) from lottery_tickets;

+-----+
|count|
+-----+
|1597 |
+-----+

총 1,597건의 데이터가 생성되었으며, 이는 테스트 요청 수와 일치합니다. 이를 통해 Kafka Producer와 Consumer가 정상적으로 메시지를 발행 및 처리했음을 확인할 수 있습니다.

Retrieving Lottery Ticket

다음 단계에서는 발급된 로또 티켓의 상태를 조회할 수 있는 API를 구현합니다. 이 API는 클라이언트가 주기적으로 데이터를 폴링하여 티켓의 처리 상태를 확인하는 데 사용되며, Redis에 저장된 데이터를 기반으로 응답을 제공합니다.

building-event-streaming-lottery-service-with-apache-kafka_04.png

이 API는 크게 다음과 같은 흐름으로 동작합니다:

  1. 클라이언트가 티켓 ID를 전달하면 컨트롤러가 요청을 수신합니다.
  2. 컨트롤러는 서비스 레이어에 티켓 ID를 전달하여 상태를 조회합니다.
  3. 서비스 레이어는 Redis에서 티켓 데이터를 검색하고, 역직렬화를 통해 처리 상태를 확인한 뒤 적절한 응답을 반환합니다.

컨트롤러는 클라이언트 요청을 처리하고 서비스 레이어에 티켓 ID를 전달하는 역할을 합니다. 처리 상태에 따라 202 Accepted 또는 200 OK 응답을 반환합니다.

LotteryQueryController.java
@Slf4j
@RequiredArgsConstructor
@RequestMapping("/lotteries")
@ApiAdapter
public class LotteryQueryController {

    private final RetrieveLotteryTicketUseCase retrieveLotteryTicketUseCase;

    @GetMapping("/{ticketId}")
    public ResponseEntity<?> retrieveLotteryTicketApi(@PathVariable Long ticketId) {
        LotteryTicketResponse response = retrieveLotteryTicketUseCase.retrieveLotteryTicket(ticketId);

        return switch (response.status()) {
            case PROCESSING -> ResponseEntity.accepted().body(compose(ACCEPTED, response));
            case COMPLETED -> ResponseEntity.ok(compose(SUCCESS, response));
            case FAILED -> ResponseEntity.badRequest().body(compose(FAILURE_UNEXPECTED, response));
        };
    }
}

서비스 레이어는 Redis에서 데이터를 검색하고 처리 상태를 확인하는 핵심 로직을 담당합니다. Redis에서 검색된 데이터가 없으면 예외를 발생시키며, 데이터를 역직렬화하여 처리 상태를 확인한 뒤 적절한 응답 객체를 생성합니다.

LotteryService.java
@Override
public LotteryTicketResponse retrieveLotteryTicket(Long ticketId) {
    String serialized = loadLotteryIssuancePort.load(ticketId.toString());
    throwsIfNotExists(ticketId, serialized);

    try {
        LotteryIssuanceMessage message = objectMapper.readValue(serialized, LotteryIssuanceMessage.class);
        return LotteryTicketResponse.requestProcessed(message);
    } catch (JsonProcessingException e) {
        log.error("retrieveLotteryTicket: {}", e.getMessage(), e);
        throw new ProcessingLotteryIssuanceFailureException("Cannot deserialize message");
    }
}

조회한 데이터의 상태가 아직 처리 중이라면, 아래와 같은 형식의 JSON 응답을 반환하여 클라이언트가 요청된 티켓의 처리 진행 상황을 알 수 있도록 합니다.

PROCESSING
{
  "code": 202,
  "phrase": "OK",
  "payload": {
    "id": 128792137083916288,
    "drawNumber": null,
    "drawDate": null,
    "issuedAt": "2024-12-21T09:33:58.322996594",
    "status": "PROCESSING",
    "numbers": []
  }
}

반면, 처리가 완료되어 복권 번호가 발급된 경우에는 다음과 같은 JSON 응답을 반환하여 클라이언트가 발급된 티켓의 세부 정보를 확인할 수 있도록 합니다.

COMPLETED
{
  "code": 200,
  "phrase": "OK",
  "payload": {
    "id": 128792137083916288,
    "drawNumber": 1150,
    "drawDate": "2024-12-14",
    "issuedAt": "2024-12-21T09:33:58.322996594",
    "status": "COMPLETED",
    "numbers": [
      3,
      8,
      11,
      17,
      23,
      29
    ]
  }
}

이 API는 Redis를 활용함으로써 빠르고 효율적으로 티켓 상태를 조회할 수 있으며, 클라이언트는 실시간에 가까운 속도로 필요한 정보를 확인할 수 있습니다.

Wrapping It Up with Apache Kafka

여기까지, Apache Kafka를 활용해 이벤트 기반 로또 번호 발급 시스템을 설계하고 구현했습니다. Kafka의 메시징 기능을 통해 비동기 이벤트 발행과 처리를 효과적으로 수행하며, 이벤트 기반 아키텍처의 유연성과 확장성을 경험할 수 있었습니다.

클라이언트 개발은 저의 전문 분야는 아니지만, 발급된 번호를 클립보드로 복사하거나 DOM 요소를 이미지로 생성해 다운로드할 수 있는 기능을 추가해 사용자 경험을 개선했습니다. 이런 세부 작업은 사실 부가기능에 불과하지만, 구현 과정에서 한참 헤매고 삽질도 많이 했었네요. 그래도 끝내 작동하는 모습을 보니, 이 작은 기능 하나가 왜 이렇게 뿌듯하고 기분이 좋은지 모르겠습니다. 😎

building-event-streaming-lottery-service-with-apache-kafka_09.png

Kafka는 높은 처리량, 우수한 확장성, 그리고 분산 시스템에서의 신뢰성을 바탕으로 현대 대규모 서비스에서 중요한 역할을 하고 있습니다. 데이터 스트리밍, 비동기 처리, 로그 관리 등 다양한 분야에서 활용되며, 서비스 간 데이터 통신을 효율적으로 처리하는 데 최적화되어 있습니다. 이러한 특성을 고려할 때, Kafka의 원리와 활용 방법을 더 깊이 이해하는 것은 앞으로의 시스템 설계와 구현에 있어 큰 도움이 될 것이라고 생각합니다. 💭

  • Architecture
  • Infrastructure
  • Kafka