AWS Kinesis を使用したイベント駆動型マイクロサービスの構築

スケーラビリティに優れたAWS Kinesisを活用したイベント駆動型アーキテクチャ

目次

AWS Kinesis は、現代のイベント駆動型マイクロサービスアーキテクチャを構築するための基盤として、最小限の運用負荷で大規模なリアルタイムデータ処理を可能にしています。

amazon-kinesis

イベント駆動型マイクロサービスアーキテクチャの理解

イベント駆動型アーキテクチャ(EDA)は、サービスが直接的な同期呼び出しではなくイベントを通じて通信する設計パターンです。このアプローチにはいくつかの利点があります:

  • 疎結合:サービスはお互いの存在を知る必要がありません
  • スケーラビリティ:各サービスは自分のワークロードに基づいて独立してスケールします
  • 耐障害性:1つのサービスの障害が他のサービスに影響を及ぼしません
  • 柔軟性:既存のサービスに変更を加えずに新しいサービスを追加できます

AWS Kinesis は、イベントストリームとしての分散型で耐久性のあるバックボーンを提供し、プロダーサーとコンシューマーを分離する役割を果たします。

AWS Kinesis 概要

AWS は、特定のユースケースに最適化されたいくつかの Kinesis サービスを提供しています。ストリーミングソリューションを評価する際には、RabbitMQ on EKS vs SQS と比較することも検討してください。

Kinesis Data Streams

リアルタイムでデータレコードを収集、保存、処理するコアストリーミングサービスです。Data Streams は次の用途に最適です:

  • カスタムリアルタイム処理アプリケーション
  • サブセカンドレイテンシーを持つデータパイプラインの構築
  • 秒単位で数百万のイベントを処理
  • イベントソーシングパターンの実装

Kinesis Data Firehose

S3、Redshift、Elasticsearch、HTTPエンドポイントなどへのストリーミングデータを配信するフルマネージドサービスです。次の用途に最適です:

  • シンプルなETLパイプライン
  • ログの集約とアーカイブ
  • ニアリアルタイム分析(60秒以上の最小レイテンシー)
  • カスタム処理ロジックが不要なシナリオ

Kinesis Data Analytics

SQLまたはApache Flinkを使用してストリーミングデータを処理および分析します。次の用途に適しています:

  • リアルタイムダッシュボード
  • ストリーミングETL
  • リアルタイム異常検出
  • 連続的なメトリクス生成

Kinesis によるアーキテクチャパターン

1. イベントソーシングパターン

アプリケーション状態のすべての変更をイベントのシーケンスとして保存します。Kinesis はこのパターンに最適です。Pythonの基本を復習したい場合は、Python Cheatsheet を確認してください:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Kinesis ストリームにイベントを公開"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# 例: ユーザー登録イベント
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS(コマンドクエリ責任分離)

Kinesis をイベントバスとして使用して、読み取りと書き込み操作を分離します:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Lambda によるファンアウトパターン

単一のストリームから複数の Lambda 関数でイベントを処理します。より強力な型安全性を持つ TypeScript の実装については、TypeScript Cheatsheet を参照してください:

// メール通知用の Lambda コンシューマ
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// 在庫更新用の別の Lambda
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

本番環境でのベストプラクティス

1. 適切なシャード数の選択

シャード数を次の基準に基づいて計算してください:

  • イングレス:シャードごとに 1 MB/sec または 1,000 レコード/sec
  • エグレス:標準コンシューマーごとに 2 MB/sec または強化されたファンアウトでコンシューマーごとに 2 MB/sec
def calculate_shards(records_per_second, avg_record_size_kb):
    """必要なシャード数を計算"""
    # イングレス容量
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # バッファを追加

2. 適切なエラーハンドリングの実装

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """指数バックオフを用いたリトライ付きレコードの挿入"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # 指数バックオフ
                    continue
            raise

3. 複数のコンシューマー用に強化されたファンアウトの使用

強化されたファンアウトは、各コンシューマーに専用のスループットを提供します:

# 強化されたファンアウトでコンシューマーを登録
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. 重要なメトリクスの監視

監視する必要のある CloudWatch メトリクス:

  • IncomingRecords:成功したレコードの数
  • IncomingBytes:インバウンドデータのバイト量
  • GetRecords.IteratorAgeMilliseconds:コンシューマーがどれだけ遅れているか
  • WriteProvisionedThroughputExceeded:サスペンションイベント
  • ReadProvisionedThroughputExceeded:コンシューマーのサスペンション

5. 適切なパーティションキー戦略の実装

import hashlib

def get_partition_key(user_id, shard_count=10):
    """均等な分布を実現するパーティションキーの生成"""
    # 一貫したハッシュを使用して均等な分布を実現
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

実際の実装例

注文処理マイクロサービスアーキテクチャの完全な例:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """注文を作成し、イベントを公開"""
        order_id = self.generate_order_id()
        
        # 注文作成イベントを公開
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Kinesis ストリームにイベントを公開"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """注文イベントを消費し、在庫を更新"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # 在庫データベースを更新
        for item in order_data['items']:
            # 実装ここに
            pass

モノリシックからマイクロサービスへの移行戦略

フェーズ1: ストラングラー・フィグ・パターン

モノリシックを維持しながら、特定のイベントを Kinesis にルーティングして開始します:

  1. モノリシック内の境界コンテキストを特定
  2. クロスコンテキストイベント用の Kinesis ストリームを作成
  3. これらのストリームからコンシューマーとなるサービスを段階的に抽出
  4. モノリシックとの後方互換性を維持

フェーズ2: 並列処理

古いシステムと新しいシステムを並列で実行します:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """レガシーシステムとイベントストリームに同時に書き込む"""
    try:
        # まず新しいシステムに書き込む
        publish_to_kinesis(kinesis_stream, data)
        
        # 次にレガシーシステムを更新
        legacy_db.update(data)
    except Exception as e:
        # 補償ロジックを実装
        rollback_kinesis_event(kinesis_stream, data)
        raise

フェーズ3: 完全な移行

信頼性が確立された後、すべてのトラフィックをイベント駆動型アーキテクチャにルーティングします。

コスト最適化戦略

1. 変動負荷にオンデマンドモードを使用

2023年に導入されたオンデマンドモードは、トラフィックに基づいて自動的にスケールします:

# オンデマンドモードでストリームを作成
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. データ集約の実装

PUTペイロードユニットを減らすためにレコードをバッチ処理します:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """コストを削減するためにレコードを集約"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # 集約されたレコードを送信
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. データ保持期間の最適化

デフォルトでは24時間です。必要に応じてのみ延長してください:

# 保持期間を7日間に設定
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

セキュリティのベストプラクティス

1. 保存および転送中の暗号化

# 暗号化されたストリームを作成
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# 暗号化を有効化
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. 最小権限の IAM ポリシー

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC エンドポイント

AWSネットワーク内でのトラフィックを維持します。AWSインフラストラクチャをコードで管理するには、Terraformを使用してください - Terraform cheatsheet を参照してください:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

可視性とデバッグ

X-Ray による分散トレース

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights クエリ

-- 遅い処理時間を検出
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- エラーレートを追跡
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

高度なパターン

分散トランザクション用のサガパターン

マイクロサービス間で長時間にわたるトランザクションを実装します:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """補償ロジック付きのサガの実行"""
        try:
            # ステップ1: 在庫を予約
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # ステップ2: 支払いを処理
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # ステップ3: 注文を発送
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # 逆順で補償
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """補償トランザクションの実行"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

テスト戦略

LocalStack によるローカル開発

# LocalStack で Kinesis を起動
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# テストストリームを作成
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

インテグレーションテスト

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """モックされた Kinesis でのイベント公開テスト"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

パフォーマンスチューニング

バッチサイズの最適化

def optimize_batch_processing(records, batch_size=500):
    """最適化されたバッチ処理"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

コネクションプーリングの使用

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

有用なリンク

AWS Kinesis リソース:

関連記事:

結論

AWS Kinesis は、スケーラブルでイベント駆動型マイクロサービスアーキテクチャを構築するための堅牢な基盤を提供します。これらのパターンとベストプラクティスに従うことで、耐障害性があり、スケーラブルで、保守可能なシステムを作成できます。最初は単一のイベントストリームから始め、アーキテクチャを検証し、システムが成長するにつれてより複雑なパターンに拡張してください。

成功の鍵は、データフローの要件を理解し、使用ケースに最適な Kinesis サービスを選択し、最初から適切な監視とエラーハンドリングを実装することです。