AWS Kinesis を用いたイベント駆動マイクロサービスの構築
AWS Kinesis を活用したスケーラブルなイベント駆動アーキテクチャ
AWS Kinesis は、最小の運用オーバーヘッドでスケーラブルなリアルタイムデータ処理を可能にし、モダンなイベント駆動マイクロサービスアーキテクチャ構築の要となっています。

イベント駆動マイクロサービスアーキテクチャの理解
イベント駆動アーキテクチャ(EDA)は、直接的な同期呼び出しではなく、イベントを介してサービス間で通信を行う設計パターンです。このアプローチには以下のような利点があります。
- 疎結合: サービスは互いの存在を知る必要がありません
- スケーラビリティ: ワークロードに応じて各サービスが独立してスケーリング可能です
- 耐障害性: 1 つのサービスの障害が他に波及しません
- 柔軟性: 既存のサービスを変更せずに新しいサービスを追加できます
AWS Kinesis は、プロデューサー(生成元)とコンシューマー(消費元)を分離する分散型かつ永続的なイベントストリームとして機能することで、EDA の実装を支える基盤を提供します。
ストリーミングプラットフォームに関するより広い視点については、セルフホスト型ソリューションとの比較として、Apache Kafka クイックスタートガイド をご参照ください。
AWS Kinesis の概要
AWS は、特定のユースケース向けに設計された複数の Kinesis サービスを提供しています。ストリーミングソリューションを評価する際、異なるメッセージングパターンやコストの影響についても考慮し、RabbitMQ on EKS と SQS の比較 も検討してみてください。
Kinesis Data Streams
リアルタイムでデータレコードをキャプチャ、保存、処理するコアのストリーミングサービスです。Data Streams は以下の用途に最適です。
- カスタムリアルタイム処理アプリケーション
- 亜秒単位のレイテンシを必要とするデータパイプラインの構築
- 1 秒間に数百万件のイベントの処理
- イベントソーシングパターンの実装
Kinesis Data Firehose
S3、Redshift、Elasticsearch、または HTTP エンドポイントなどの宛先にストリーミングデータを配信する完全に管理されたサービスです。以下の用途に適しています。
- シンプルな ETL パイプライン
- ログの集約とアーカイブ
- 準リアルタイム分析(最低 60 秒のレイテンシ)
- カスタム処理ロジックが不要なシナリオ
Kinesis Data Analytics
SQL または Apache Flink を使用してストリーミングデータを処理および分析します。ユースケースには以下が含まれます。
- リアルタイムダッシュボード
- ストリーミング ETL
- リアルタイム異常検知
- 継続的な指標生成
Flink 運用の詳細については、K8s と Kafka 環境での Apache Flink ガイド を参照してください。
Kinesis を利用したアーキテクチャパターン
1. イベントソーシングパターン
イベントソーシングでは、アプリケーションの状態変更をイベントのシーケンスとして保存します。Kinesis はこれに最適です。Python の基礎を復習したい場合は、Python チートシート をご覧ください。
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""イベントを Kinesis ストリームに公開します"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# 例:ユーザー登録イベント
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (コマンドクエリ責任分離)
Kinesis をイベントバスとして使用して、読み書き操作を分離します。
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Lambda を使用した Fan-Out パターン
単一のストリームからのイベントを複数の Lambda 関数で処理します。より強力な型安全性を備えた TypeScript 実装については、TypeScript チートシート を参照してください。
// メール通知用の Lambda コンシューマー
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// 在庫更新用の別の Lambda
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
本番環境向けベストプラクティス
1. 適切なシャード数の選択
シャード要件は以下に基づいて計算してください。
- 入力(Ingress): シャードあたり 1 MB/秒 または 1,000 レコード/秒
- 出力(Egress): シャードあたり 2 MB/秒(標準コンシューマー)または、拡張ファンアウトを使用する場合、コンシューマーあたり 2 MB/秒
def calculate_shards(records_per_second, avg_record_size_kb):
"""必要なシャード数を計算します"""
# 入力容量
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # バッファを追加
2. 適切なエラーハンドリングの実装
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""指数バックオフリトライ付きのレコード投入"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数バックオフ
continue
raise
3. 複数のコンシューマー向けに拡張ファンアウトを使用
拡張ファンアウトは、各コンシューマーに対して専用スループットを提供します。
# 拡張ファンアウト付きでコンシューマーを登録
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. 重要なメトリックの監視
追跡すべき重要な CloudWatch メトリック:
IncomingRecords: 正常に投入されたレコード数IncomingBytes: 入力データのバイト数GetRecords.IteratorAgeMilliseconds: コンシューマーの遅延時間WriteProvisionedThroughputExceeded: スロットリングイベントReadProvisionedThroughputExceeded: コンシューマーのスロットリング
5. 適切なパーティションキー戦略の実装
import hashlib
def get_partition_key(user_id, shard_count=10):
"""均等な分散を持つパーティションキーを生成"""
# 均等な分散のために一貫性のあるハッシュを使用
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
実世界の導入例
以下は、注文処理マイクロサービスアーキテクチャの完全な例です。
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""注文を作成し、イベントを公開"""
order_id = self.generate_order_id()
# 注文作成イベントを公開
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""イベントを Kinesis ストリームに公開"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""注文イベントを消費し、在庫を更新"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# 在庫データベースを更新
for item in order_data['items']:
# ここに実装
pass
モノリシックからマイクロサービスへの移行戦略
フェーズ 1: ストラングラー・フィグ・パターン
モノリシックを維持しつつ、特定のイベントを Kinesis 経由でルーティングすることから始めます。
- モノリシック内のバウンデッドコンテキストを特定
- コンテキスト間イベント用の Kinesis ストリームを作成
- これらのストリームから消費するサービスを徐々に抽出
- モノリシックとの互換性を維持
フェーズ 2: 並行処理
新旧のシステムを並行して実行します。
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""レガシーシステムとイベントストリームの両方に書き込み"""
try:
# まず新しいシステムに書き込み
publish_to_kinesis(kinesis_stream, data)
# 次にレガシーシステムを更新
legacy_db.update(data)
except Exception as e:
# 補償ロジックを実装
rollback_kinesis_event(kinesis_stream, data)
raise
フェーズ 3: 完全移行
信頼性が確立されたら、すべてのトラフィックをイベント駆動アーキテクチャ経由でルーティングします。
コスト最適化戦略
オブジェクトストレージやデータベースアーキテクチャを含むデータインフラストラクチャパターンの包括的なガイドについては、AI システムのためのデータインフラストラクチャ:オブジェクトストレージ、データベース、検索、AI データアーキテクチャ を参照してください。
1. 可変ワークロード向けにオンデマンドモードを使用
オンデマンドモード(2023 年導入)は、トラフィックに基づいて自動的にスケーリングします。
# オンデマンドモードでストリームを作成
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. データ集約の実装
PUT ペイロード単位を削減するためにレコードをバッチ処理します。
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""コスト削減のためにレコードを集約"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# 集約されたレコードを送信
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. データ保持期間の最適化
デフォルトの保持期間は 24 時間です。必要に応じてのみ延長してください。
# 保持期間を 7 日に設定
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
セキュリティのベストプラクティス
1. 保存時と転送時の暗号化
# 暗号化ストリームの作成
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# 暗号化を有効化
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. 最小権限の IAM ポリシー
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC エンドポイント
トラフィックを AWS ネットワーク内に留めます。AWS インフラストラクチャをコードとして管理するには Terraform の使用を検討してください。Terraform チートシート を参照してください。
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
可観測性とデバッグ
X-Ray を利用した分散トレーシング
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights クエリ
-- 処理時間の遅延を検索
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- エラー率を追跡
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
高度なパターン
分散トランザクション向けのサガパターン
マイクロサービス間で長期間にわたるトランザクションを実装します。
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""補償ロジック付きでサガを実行"""
try:
# ステップ 1: 在庫を予約
self.publish_command('RESERVE_INVENTORY', order_data)
# ステップ 2: 支払いを処理
self.publish_command('PROCESS_PAYMENT', order_data)
# ステップ 3: 注文を発送
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# 逆順で補償
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""補償トランザクションを実行"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
テスト戦略
LocalStack を利用したローカル開発
# Kinesis 付き LocalStack を起動
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# テストストリームを作成
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
統合テスト
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""モック Kinesis を使用したイベント公開のテスト"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
パフォーマンスチューニング
バッチサイズの最適化
def optimize_batch_processing(records, batch_size=500):
"""最適化されたバッチでレコードを処理"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
接続プーリングの使用
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
参考リンク
AWS Kinesis リソース:
- AWS Kinesis ドキュメント
- AWS Kinesis Data Streams 開発者ガイド
- Kinesis Client Library (KCL)
- AWS Kinesis 料金計算機
- Kinesis Data Streams クォータと制限
- AWS アーキテクチャブログ - イベント駆動アーキテクチャ
- AWS サンプル - Kinesis 例
関連記事:
結論
AWS Kinesis は、スケーラブルでイベント駆動型のマイクロサービスアーキテクチャを構築するための堅牢な基盤を提供します。これらのパターンとベストプラクティスに従うことで、耐障害性、スケーラビリティ、保守性のあるシステムを作成できます。まずは単一のイベントストリームから小さく始め、アーキテクチャを検証し、システムが成長するにつれて徐々に複雑なパターンへと拡大させてください。
成功の鍵は、データフロー要件を理解し、ユースケースに適した Kinesis サービスを選択し、初期段階から適切な監視とエラーハンドリングを実装することにあります。