AWS Kinesis 를 활용한 이벤트 기반 마이크로서비스 구축

규모 확장을 위한 AWS Kinesis 기반 이벤트 중심 아키텍처

Page content

AWS Kinesis 는 최소한의 운영 오버헤드로 대규모 실시간 데이터 처리를 가능하게 하여 현대의 이벤트 기반 마이크로서비스 아키텍처를 구축하는 핵심 요소가 되었습니다.

amazon-kinesis

이벤트 기반 마이크로서비스 아키텍처 이해하기

이벤트 기반 아키텍처 (EDA) 는 서비스가 직접적인 동기 호출 대신 이벤트를 통해 상호 소통하는 디자인 패턴입니다. 이 접근 방식은 다음과 같은 여러 장점을 제공합니다:

  • 느슨한 결합 (Loose coupling): 서비스 간에 서로의 존재를 알 필요 없음
  • 확장성 (Scalability): 각 서비스가 작업 부하에 따라 독립적으로 확장됨
  • 복원력 (Resilience): 한 서비스의 실패가 다른 서비스로 연쇄적으로 전파되지 않음
  • 유연성 (Flexibility): 기존 서비스를 수정하지 않고도 새로운 서비스를 추가 가능

AWS Kinesis 는 프로듀서와 컨슈머를 분리하는 분산되고 내구성이 있는 이벤트 스트림으로서 EDA 를 구현하기 위한 백본 역할을 제공합니다.

스트리밍 플랫폼에 대한 더 넓은 관점을 위해, 자체 호스팅 대안과의 비교를 위한 Apache Kafka 빠른 시작 가이드 를 참조하세요.

AWS Kinesis 개요

AWS 는 특정 사용 사례를 위해 설계된 여러 Kinesis 서비스를 제공합니다. 스트리밍 솔루션을 평가할 때는 다양한 메시징 패턴과 비용 영향에 따라 RabbitMQ on EKS 와 SQS 비교 도 고려해 볼 수 있습니다.

Kinesis Data Streams

실시간으로 데이터 레코드를 캡처, 저장 및 처리하는 핵심 스트리밍 서비스입니다. Data Streams 는 다음과 같은 경우에 이상적입니다:

  • 사용자 정의 실시간 처리 애플리케이션
  • 초 단위 이하 지연 시간을 가진 데이터 파이프라인 구축
  • 초당 수백만 개의 이벤트 처리
  • 이벤트 소싱 패턴 구현

Kinesis Data Firehose

S3, Redshift, Elasticsearch, HTTP 엔드포인트와 같은 목적지로 스트리밍 데이터를 전송하는 완전히 관리되는 서비스입니다. 다음 경우에 가장 적합합니다:

  • 간단한 ETL 파이프라인
  • 로그 집계 및 아카이빙
  • 준실시간 분석 (최소 60 초 지연 시간)
  • 사용자 정의 처리 로직이 필요 없는 시나리오

Kinesis Data Analytics

SQL 또는 Apache Flink 를 사용하여 스트리밍 데이터를 처리하고 분석합니다. 사용 사례는 다음과 같습니다:

  • 실시간 대시보드
  • 스트리밍 ETL
  • 실시간 이상 탐지
  • 지속적인 지표 생성

Flink 운영에 대한 더 깊은 이해를 위해 K8s 와 Kafka 에서 Apache Flink 가이드 를 참조하세요.

Kinesis 를 활용한 아키텍처 패턴

1. 이벤트 소싱 (Event Sourcing) 패턴

이벤트 소싱은 애플리케이션 상태 변경을 이벤트 시퀀스로 저장합니다. Kinesis 는 이에 완벽하게 적합합니다. Python 기초에 대한 복습이 필요하면 Python 치트시트 를 확인하세요:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """이벤트를 Kinesis 스트림에 발행"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# 예시: 사용자 등록 이벤트
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Kinesis 를 이벤트 버스로서 사용하여 읽기 및 쓰기 작업을 분리합니다:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Lambda 와 함께한 Fan-Out 패턴

단일 스트림의 이벤트를 여러 Lambda 함수로 처리합니다. 더 강력한 타입 안전성을 위한 TypeScript 구현은 TypeScript 치트시트 를 참조하세요:

// 이메일 알림용 Lambda 컨슈머
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// 재고 업데이트용 다른 Lambda
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

프로덕션 환경 모범 사례

1. 적절한 샤드 (Shard) 수 선택하기

다음 기준에 따라 샤드 요구량을 계산합니다:

  • 입력 (Ingress): 샤드당 1 MB/초 또는 1,000 레코드/초
  • 출력 (Egress): 샤드당 2 MB/초 (표준 컨슈머) 또는 향상된 팬아웃 사용 시 컨슈머당 2 MB/초
def calculate_shards(records_per_second, avg_record_size_kb):
    """필요한 샤드 수 계산"""
    # 입력 용량
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # 버퍼 추가

2. 적절한 오류 처리 구현

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """지수 백오프 (exponential backoff) 를 사용한 재시도 레코드 전송"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # 지수 백오프
                    continue
            raise

3. 여러 컨슈머를 위한 향상된 팬아웃 (Enhanced Fan-Out) 사용

향상된 팬아웃은 각 컨슈머에게 전용 처리량을 제공합니다:

# 향상된 팬아웃을 사용하여 컨슈머 등록
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. 주요 지표 모니터링

추적해야 할 필수 CloudWatch 지표:

  • IncomingRecords: 성공적으로 전송된 레코드 수
  • IncomingBytes: 들어오는 데이터의 바이트 양
  • GetRecords.IteratorAgeMilliseconds: 컨슈머가 뒤쳐진 정도
  • WriteProvisionedThroughputExceeded: 스로틀링 이벤트
  • ReadProvisionedThroughputExceeded: 컨슈머 스로틀링

5. 적절한 파티션 키 전략 구현

import hashlib

def get_partition_key(user_id, shard_count=10):
    """균일한 분포를 위한 파티션 키 생성"""
    # 균일한 분포를 위한 일관된 해싱 사용
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

실제 구현 예제

주문 처리 마이크로서비스 아키텍처의 완전한 예제입니다:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """주문 생성 및 이벤트 발행"""
        order_id = self.generate_order_id()
        
        # 주문 생성 이벤트 발행
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """이벤트를 Kinesis 스트림에 발행"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """주문 이벤트를 소비하고 재고를 업데이트"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # 재고 데이터베이스 업데이트
        for item in order_data['items']:
            # 구현 내용
            pass

모놀리식에서 마이크로서비스로의 마이그레이션 전략

단계 1: strangler Fig 패턴

모놀리식을 유지하면서 특정 이벤트를 Kinesis 를 통해 라우팅으로 시작합니다:

  1. 모놀리식 내의 경계 컨텍스트 식별
  2. 컨텍스트 간 이벤트를 위한 Kinesis 스트림 생성
  3. 이러한 스트림에서 소비하는 서비스를 점진적으로 추출
  4. 모놀리식과의 하위 호환성 유지

단계 2: 병렬 처리

구체 시스템과 새로운 시스템을 병렬로 실행합니다:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """레거시 시스템과 이벤트 스트림에 모두 작성"""
    try:
        # 먼저 새로운 시스템에 작성
        publish_to_kinesis(kinesis_stream, data)
        
        # 그 다음 레거시 시스템 업데이트
        legacy_db.update(data)
    except Exception as e:
        # 보상 로직 구현
        rollback_kinesis_event(kinesis_stream, data)
        raise

단계 3: 전체 마이그레이션

확신이 서면 모든 트래픽을 이벤트 기반 아키텍처를 통해 라우팅합니다.

비용 최적화 전략

객체 저장소 및 데이터베이스 아키텍처를 포함한 데이터 인프라 패턴에 대한 포괄적인 지침은 AI 시스템을 위한 데이터 인프라: 객체 저장소, 데이터베이스, 검색 및 AI 데이터 아키텍처 를 참조하세요.

1. 변동 작업 부하를 위한 On-Demand 모드 사용

2023 년에 도입된 On-Demand 모드는 트래픽에 따라 자동으로 확장됩니다:

# On-Demand 모드로 스트림 생성
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. 데이터 집계 구현

PUT 페이로드 단위를 줄이기 위해 레코드를 배치합니다:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """비용 절감을 위해 레코드 집계"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # 집계된 레코드 전송
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. 데이터 보존 기간 최적화

기본 보존 기간은 24 시간입니다. 필요한 경우에만 연장하세요:

# 보존 기간을 7 일로 설정
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

보안 모범 사례

1. 저장 및 전송 중 암호화

# 암호화된 스트림 생성
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# 암호화 활성화
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. 최소 권한을 위한 IAM 정책

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC 엔드포인트

트래픽을 AWS 네트워크 내부에 유지합니다. 인프라를 코드로 관리하려면 Terraform 을 고려하세요 - Terraform 치트시트 를 참조하세요:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

가시성 및 디버깅

X-Ray 를 활용한 분산 트레이싱

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights 쿼리

-- 느린 처리 시간 찾기
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- 오류 추적률 추적
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

고급 패턴

분산 트랜잭션을 위한 사가 (Saga) 패턴

마이크로서비스 간에 장기 실행 트랜잭션을 구현합니다:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """보상 로직을 갖춘 사가 실행"""
        try:
            # 단계 1: 재고 예약
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # 단계 2: 결제 처리
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # 단계 3: 주문 배송
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # 역순으로 보상 실행
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """보상 트랜잭션 실행"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

테스트 전략

LocalStack 를 활용한 로컬 개발

# Kinesis 와 함께 LocalStack 시작
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# 테스트 스트림 생성
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

통합 테스트

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """모킹된 Kinesis 로 이벤트 발행 테스트"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

성능 튜닝

배치 크기 최적화

def optimize_batch_processing(records, batch_size=500):
    """최적화된 배치로 레코드 처리"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

연결 풀링 사용

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

유용한 링크

AWS Kinesis 리소스:

관련 기사:

결론

AWS Kinesis 는 확장 가능하고 탄력적이며 유지보수가 쉬운 시스템을 구축하기 위한 견고한 기반을 제공합니다. 이러한 패턴과 모범 사례를 따르시면 이벤트 기반 마이크로서비스 아키텍처를 성공적으로 구현할 수 있습니다. 단일 이벤트 스트림으로 작게 시작하여 아키텍처를 검증한 후, 시스템이 성장함에 따라 더 복잡한 패턴으로 점진적으로 확장해 나가세요.

성공의 열쇠는 데이터 흐름 요구 사항을 이해하고, 사용 사례에 맞는 올바른 Kinesis 서비스를 선택하며, 시작 단계부터 적절한 모니터링과 오류 처리를 구현하는 것입니다.

구독하기

시스템, 인프라, AI 엔지니어링에 관한 새 글을 받아보세요.