AWS Kinesis를 사용한 이벤트 주도형 마이크로서비스 구축
확장성을 위한 AWS Kinesis를 활용한 이벤트 주도 아키텍처
AWS Kinesis은 현대적인 이벤트 기반 마이크로서비스 아키텍처를 구축하는 데 핵심적인 역할을 하며, 최소한의 운영 부담으로 대규모 실시간 데이터 처리를 가능하게 해줍니다.

이벤트 기반 마이크로서비스 아키텍처 이해
이벤트 기반 아키텍처(EDA)는 서비스가 직접적인 동기식 호출 대신 이벤트를 통해 소통하는 설계 패턴입니다. 이 접근 방식은 다음과 같은 장점을 제공합니다:
- 느슨한 결합: 서비스는 서로의 존재를 알 필요가 없습니다.
- 확장성: 각 서비스는 작업량에 따라 독립적으로 확장됩니다.
- 내결함성: 하나의 서비스에서의 실패가 다른 서비스로 전파되지 않습니다.
- 유연성: 기존 서비스에 영향을 주지 않고 새로운 서비스를 추가할 수 있습니다.
AWS Kinesis는 EDA를 구현하는 데 뼈대를 제공하며, 생산자와 소비자를 분리하는 분산형, 내구성 있는 이벤트 스트림으로 작동합니다.
AWS Kinesis 개요
AWS는 특정 사용 사례에 맞춰 설계된 여러 Kinesis 서비스를 제공합니다. 스트리밍 솔루션을 평가할 때, 다양한 메시징 패턴과 비용 영향을 고려하기 위해 RabbitMQ on EKS vs SQS를 비교하는 것도 고려할 수 있습니다.
Kinesis Data Streams
실시간으로 데이터 기록을 수집, 저장 및 처리하는 핵심 스트리밍 서비스입니다. Data Streams는 다음과 같은 경우에 이상적입니다:
- 사용자 정의 실시간 처리 애플리케이션
- 밀리초 단위 지연으로 데이터 파이프라인 구축
- 초당 수백만 개의 이벤트 처리
- 이벤트 소싱 패턴 구현
Kinesis Data Firehose
S3, Redshift, Elasticsearch 또는 HTTP 엔드포인트와 같은 목적지로 스트리밍 데이터를 전달하는 완전히 관리되는 서비스입니다. 다음의 경우에 적합합니다:
- 간단한 ETL 파이프라인
- 로그 집계 및 보관
- 실시간 분석(최소 60초 지연)
- 사용자 정의 처리 논리가 필요하지 않은 시나리오
Kinesis Data Analytics
SQL 또는 Apache Flink를 사용하여 스트리밍 데이터를 처리하고 분석합니다. 사용 사례는 다음과 같습니다:
- 실시간 대시보드
- 스트리밍 ETL
- 실시간 이상 탐지
- 지속적인 메트릭 생성
Kinesis와 함께하는 아키텍처 패턴
1. 이벤트 소싱 패턴
이벤트 소싱은 애플리케이션 상태의 모든 변경 사항을 이벤트의 시퀀스로 저장합니다. Kinesis는 이에 매우 적합합니다. Python 기초에 대한 간단한 복습이 필요하다면, 우리의 Python Cheatsheet를 확인해 보세요:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Kinesis 스트림에 이벤트를 게시합니다."""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# 예시: 사용자 등록 이벤트
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Kinesis를 이벤트 버스로 사용하여 읽기와 쓰기 작업을 분리합니다:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Lambda와 함께하는 팬아웃 패턴
하나의 스트림에서 이벤트를 처리하는 여러 Lambda 함수를 사용합니다. 더 강력한 타입 안전성을 위한 TypeScript 구현이 필요하다면, 우리의 TypeScript Cheatsheet를 참조하세요:
// 이메일 알림을 위한 Lambda 소비자
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// 재고 업데이트를 위한 다른 Lambda
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
프로덕션에서의 최선의 실천
1. 적절한 셰드 수 선택
셰드 수를 다음 기준에 따라 계산하세요:
- 입력: 셰드당 초당 1MB 또는 1,000개의 기록
- 출력: 표준 소비자당 셰드당 초당 2MB 또는 강화된 팬아웃 기준으로 소비자당 초당 2MB
def calculate_shards(records_per_second, avg_record_size_kb):
"""필요한 셰드 수 계산"""
# 입력 용량
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # 버퍼 추가
2. 적절한 오류 처리 구현
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""지수적 백오프 재시도를 사용한 기록 삽입"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 지수적 백오프
continue
raise
3. 여러 소비자에 대한 강화된 팬아웃 사용
강화된 팬아웃은 각 소비자에 대한 전용 처리량을 제공합니다:
# 강화된 팬아웃을 사용하여 소비자 등록
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. 주요 메트릭 모니터링
추적해야 할 필수 CloudWatch 메트릭:
IncomingRecords: 성공적으로 삽입된 기록 수IncomingBytes: 입력 데이터의 바이트량GetRecords.IteratorAgeMilliseconds: 소비자가 얼마나 뒤처져 있는지WriteProvisionedThroughputExceeded: 제한 이벤트ReadProvisionedThroughputExceeded: 소비자 제한
5. 적절한 파티션 키 전략 구현
import hashlib
def get_partition_key(user_id, shard_count=10):
"""균형 잡힌 분포를 위한 파티션 키 생성"""
# 균형 잡힌 분포를 위해 일관된 해싱 사용
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
실제 구현 예시
다음은 주문 처리 마이크로서비스 아키텍처의 전체 예시입니다:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""주문 생성 및 이벤트 게시"""
order_id = self.generate_order_id()
# 주문 생성 이벤트 게시
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Kinesis 스트림에 이벤트 게시"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""주문 이벤트를 소비하고 재고를 업데이트합니다."""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# 재고 데이터베이스 업데이트
for item in order_data['items']:
# 구현 부분
pass
모놀리스에서 마이크로서비스로의 마이그레이션 전략
1단계: 스트랭글러 피그 패턴
모놀리스를 유지하면서 특정 이벤트를 Kinesis를 통해 라우팅하여 시작합니다:
- 모놀리스의 경계 컨텍스트를 식별합니다.
- 경계 컨텍스트 간 이벤트를 위한 Kinesis 스트림을 생성합니다.
- 이러한 스트림을 소비하는 서비스를 점진적으로 추출합니다.
- 모놀리스와의 호환성을 유지합니다.
2단계: 병렬 처리
기존 시스템과 새로운 시스템을 병렬로 실행합니다:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""기존 시스템과 이벤트 스트림에 모두 쓰기"""
try:
# 먼저 새로운 시스템에 쓰기
publish_to_kinesis(kinesis_stream, data)
# 그 후 기존 시스템 업데이트
legacy_db.update(data)
except Exception as e:
# 보상 논리 구현
rollback_kinesis_event(kinesis_stream, data)
raise
3단계: 전체 마이그레이션
신뢰도가 확보되면 모든 트래픽을 이벤트 기반 아키텍처로 라우팅합니다.
비용 최적화 전략
1. 가변 워크로드에 On-Demand 모드 사용
2023년에 도입된 On-Demand 모드는 트래픽에 따라 자동으로 확장됩니다:
# On-Demand 모드로 스트림 생성
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. 데이터 집계 구현
PUT 페이로드 단위를 줄이기 위해 기록을 배치합니다:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""비용 절감을 위해 기록 집계"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# 집계된 기록 전송
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. 데이터 보존 최적화
기본 보존 기간은 24시간입니다. 필요할 경우에만 확장하세요:
# 7일로 보존 기간 설정
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
보안 최선의 실천
1. 정지 및 전송 중 암호화
# 암호화된 스트림 생성
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# 암호화 활성화
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. 최소 권한을 위한 IAM 정책
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC 엔드포인트
AWS 네트워크 내부에서 트래픽을 유지하세요. AWS 인프라를 코드로 관리하려면 Terraform을 고려하세요 - 우리의 Terraform cheatsheet를 참조하세요:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
가시성 및 디버깅
X-Ray를 사용한 분산 추적
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights 쿼리
-- 느린 처리 시간 찾기
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- 오류율 추적
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
고급 패턴
분산 트랜잭션을 위한 Saga 패턴
마이크로서비스 간에 장기 트랜잭션을 실행합니다:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""보상 논리와 함께 Saga 실행"""
try:
# 단계 1: 재고 예약
self.publish_command('RESERVE_INVENTORY', order_data)
# 단계 2: 결제 처리
self.publish_command('PROCESS_PAYMENT', order_data)
# 단계 3: 주문 배송
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# 역순으로 보상 실행
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""보상 트랜잭션 실행"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
테스트 전략
LocalStack을 사용한 로컬 개발
# Kinesis와 함께 LocalStack 시작
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# 테스트 스트림 생성
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
통합 테스트
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""모킹된 Kinesis를 사용한 이벤트 게시 테스트"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
성능 최적화
배치 크기 최적화
def optimize_batch_processing(records, batch_size=500):
"""최적화된 배치로 기록 처리"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
연결 풀링 사용
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
유용한 링크
AWS Kinesis 자료:
- AWS Kinesis 문서
- AWS Kinesis Data Streams 개발자 가이드
- Kinesis 클라이언트 라이브러리 (KCL)
- AWS Kinesis 가격 계산기
- Kinesis Data Streams 할당량 및 제한
- AWS 아키텍처 블로그 - 이벤트 기반 아키텍처
- AWS 샘플 - Kinesis 예제
관련 기사:
- Rabbitmq on Eks vs Sqs 호스팅 비용 비교
- TypeScript Cheatsheet: 핵심 개념 및 최선의 실천
- Python Cheatsheet
- Terraform cheatsheet - 유용한 명령어 및 예제
결론
AWS Kinesis는 확장성 있는 이벤트 기반 마이크로서비스 아키텍처를 구축하는 데 견고한 기반을 제공합니다. 이러한 패턴과 최선의 실천을 따르면 내결함성, 확장성, 유지보수가 가능한 시스템을 만들 수 있습니다. 작은 하나의 이벤트 스트림부터 시작하여 아키텍처를 검증하고, 시스템이 성장하면서 더 복잡한 패턴으로 점진적으로 확장하세요.
성공의 열쇠는 데이터 흐름 요구사항을 이해하고, 사용 사례에 적합한 Kinesis 서비스를 선택하며, 처음부터 적절한 모니터링과 오류 처리를 구현하는 것입니다.