Aufbau von ereignisgesteuerten Mikroservices mit AWS Kinesis

Ereignisgesteuerte Architektur mit AWS Kinesis für Skalierbarkeit

Inhaltsverzeichnis

AWS Kinesis ist zu einem Eckpfeiler für den Aufbau moderner ereignisgesteuerter Mikroservice-Architekturen geworden, die eine Echtzeit-Datenverarbeitung im großen Maßstab mit minimalem Betriebsaufwand ermöglichen.

amazon-kinesis

Verständnis der ereignisgesteuerten Mikroservice-Architektur

Die ereignisgesteuerte Architektur (EDA) ist ein Designmuster, bei dem Dienste über Ereignisse statt über direkte synchrone Aufrufe kommunizieren. Dieser Ansatz bietet mehrere Vorteile:

  • Lockere Kopplung: Dienste müssen nicht voneinander wissen
  • Skalierbarkeit: Jeder Dienst skaliert unabhängig basierend auf seiner Arbeitslast
  • Widerstandsfähigkeit: Ausfälle in einem Dienst breiten sich nicht auf andere aus
  • Flexibilität: Neue Dienste können hinzugefügt werden, ohne bestehende zu modifizieren

AWS Kinesis bildet das Rückgrat für die Implementierung von EDA, indem es als verteilte, dauerhafte Ereignisstrom dient, der Produzenten von Konsumenten entkoppelt.

AWS Kinesis Überblick

AWS bietet mehrere Kinesis-Dienste, die jeweils für spezifische Anwendungsfälle konzipiert sind. Bei der Bewertung von Streaming-Lösungen möchten Sie möglicherweise auch die Vergleich von RabbitMQ auf EKS vs SQS für verschiedene Nachrichtenmuster und Kostenimplikationen berücksichtigen.

Kinesis Data Streams

Der Kern-Streaming-Dienst, der Datenaufzeichnungen in Echtzeit erfasst, speichert und verarbeitet. Data Streams ist ideal für:

  • Benutzerdefinierte Echtzeitverarbeitungsanwendungen
  • Aufbau von Datenpipelines mit Sub-Sekunden-Latenz
  • Verarbeitung von Millionen von Ereignissen pro Sekunde
  • Implementierung von Event-Sourcing-Mustern

Kinesis Data Firehose

Ein vollständig verwalteter Dienst, der Streaming-Daten an Ziele wie S3, Redshift, Elasticsearch oder HTTP-Endpunkte liefert. Am besten geeignet für:

  • Einfache ETL-Pipelines
  • Log-Aggregation und Archivierung
  • Nahezu Echtzeit-Analysen (60-Sekunden-Mindestlatenz)
  • Szenarien, bei denen Sie keine benutzerdefinierte Verarbeitungslogik benötigen

Kinesis Data Analytics

Verarbeitet und analysiert Streaming-Daten mit SQL oder Apache Flink. Anwendungsfälle umfassen:

  • Echtzeit-Dashboards
  • Streaming-ETL
  • Echtzeit-Anomalieerkennung
  • Kontinuierliche Metrikerzeugung

Architektur-Muster mit Kinesis

1. Event-Sourcing-Muster

Event-Sourcing speichert alle Änderungen des Anwendungszustands als eine Sequenz von Ereignissen. Kinesis ist perfekt dafür. Wenn Sie eine Auffrischung der Python-Grundlagen benötigen, werfen Sie einen Blick auf unseren Python Cheatsheet:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Veröffentliche ein Ereignis im Kinesis-Stream"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }

    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )

    return response['SequenceNumber']

# Beispiel: Benutzerregistrierungs-Ereignis
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Trennen Sie Lese- und Schreiboperationen mit Kinesis als Event-Bus:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }

    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })

    return err
}

3. Fan-Out-Muster mit Lambda

Verarbeiten Sie Ereignisse aus einem einzelnen Stream mit mehreren Lambda-Funktionen. Für TypeScript-Implementierungen mit stärkerer Typensicherheit beziehen Sie sich auf unseren TypeScript Cheatsheet:

// Lambda-Konsument für E-Mail-Benachrichtigungen
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );

        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Eine weitere Lambda für Lagerbestandsaktualisierungen
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );

        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Best Practices für die Produktion

1. Wahl der richtigen Shard-Anzahl

Berechnen Sie Ihre Shard-Anforderungen basierend auf:

  • Ingress: 1 MB/sec oder 1.000 Aufzeichnungen/sec pro Shard
  • Egress: 2 MB/sec pro Shard (Standard-Konsumenten) oder 2 MB/sec pro Konsument mit Enhanced Fan-Out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Berechne die erforderliche Anzahl von Shards"""
    # Ingress-Kapazität
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )

    return int(ingress_shards) + 1  # Puffer hinzufügen

2. Implementieren Sie eine ordnungsgemäße Fehlerbehandlung

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
                          max_retries=3):
    """Put record mit exponentieller Backoff-Wiederholung"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponentieller Backoff
                    continue
            raise

3. Verwenden Sie Enhanced Fan-Out für mehrere Konsumenten

Enhanced Fan-Out bietet dedizierte Durchsatzkapazität für jeden Konsumenten:

# Registrieren Sie einen Konsumenten mit Enhanced Fan-Out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Überwachen Sie wichtige Metriken

Wichtige CloudWatch-Metriken zur Überwachung:

  • IncomingRecords: Anzahl der erfolgreich gesetzten Aufzeichnungen
  • IncomingBytes: Byte-Volumen der eingehenden Daten
  • GetRecords.IteratorAgeMilliseconds: Wie weit hinter den Konsumenten sie sind
  • WriteProvisionedThroughputExceeded: Throttling-Ereignisse
  • ReadProvisionedThroughputExceeded: Konsumenten-Throttling

5. Implementieren Sie eine ordnungsgemäße Partition-Key-Strategie

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generiere Partition-Key mit gleichmäßiger Verteilung"""
    # Verwenden Sie konsistente Hashing für gleichmäßige Verteilung
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Beispiel für eine reale Implementierung

Hier ist ein vollständiges Beispiel einer Bestellverarbeitungs-Mikroservice-Architektur:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name

    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Erstelle Bestellung und veröffentliche Ereignisse"""
        order_id = self.generate_order_id()

        # Veröffentliche Bestellung erstellt Ereignis
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)

        return order_id

    def publish_event(self, event_type: str, payload: Dict,
                     partition_key: str):
        """Veröffentliche Ereignis im Kinesis-Stream"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }

        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Verbraucht Bestellereignisse und aktualisiert den Lagerbestand"""

    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])

            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])

    def reserve_inventory(self, order_data):
        # Aktualisiere Lagerbestandsdatenbank
        for item in order_data['items']:
            # Implementierung hier
            pass

Migrationsstrategie von Monolith zu Microservices

Phase 1: Strangler Fig Pattern

Beginnen Sie mit dem Routing spezifischer Ereignisse über Kinesis, während der Monolith beibehalten wird:

  1. Identifizieren Sie begrenzte Kontexte in Ihrem Monolith
  2. Erstellen Sie Kinesis-Streams für ereignisse zwischen Kontexten
  3. Extrahieren Sie schrittweise Dienste, die diese Streams konsumieren
  4. Halten Sie die Abwärtskompatibilität mit dem Monolith aufrecht

Phase 2: Parallelverarbeitung

Führen Sie sowohl das alte als auch das neue System parallel aus:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Schreiben in sowohl das Legacy-System als auch den Event-Stream"""
    try:
        # Zuerst in das neue System schreiben
        publish_to_kinesis(kinesis_stream, data)

        # Dann das Legacy-System aktualisieren
        legacy_db.update(data)
    except Exception as e:
        # Implementieren Sie Kompensationslogik
        rollback_kinesis_event(kinesis_stream, data)
        raise

Phase 3: Vollständige Migration

Sobald das Vertrauen hergestellt ist, leiten Sie den gesamten Verkehr über die ereignisgesteuerte Architektur weiter.

Kostenoptimierungsstrategien

1. Verwenden Sie On-Demand-Modus für variable Arbeitslasten

On-Demand-Modus (eingeführt 2023) skaliert automatisch basierend auf dem Verkehr:

# Erstellen Sie einen Stream mit On-Demand-Modus
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementieren Sie Datenaggregation

Reduzieren Sie PUT-Payload-Einheiten durch das Batching von Datensätzen:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Aggregieren Sie Datensätze, um Kosten zu reduzieren"""
    aggregator = RecordAggregator()

    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )

    # Senden Sie den aggregierten Datensatz
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimieren Sie die Datenaufbewahrung

Die Standardaufbewahrungsdauer beträgt 24 Stunden. Verlängern Sie sie nur, wenn notwendig:

# Setzen Sie die Aufbewahrungsdauer auf 7 Tage
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Sicherheits-Best Practices

1. Verschlüsselung bei Ruhe und im Transit

# Erstellen Sie einen verschlüsselten Stream
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Aktivieren Sie die Verschlüsselung
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. IAM-Richtlinien für das Prinzip des geringsten Privilegs

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC-Endpunkte

Halten Sie den Verkehr innerhalb des AWS-Netzwerks. Für das Verwalten von AWS-Infrastruktur als Code können Sie Terraform verwenden - siehe unseren Terraform-Cheat-Sheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Beobachtbarkeit und Debugging

Verteilte Nachverfolgung mit X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])

    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights Abfragen

-- Finden Sie langsame Verarbeitungszeiten
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Verfolgen Sie Fehlerraten
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Fortgeschrittene Muster

Saga-Muster für verteilte Transaktionen

Implementieren Sie langlaufende Transaktionen über Microservices:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())

    def execute(self, order_data):
        """Führen Sie die Saga mit Kompensationslogik aus"""
        try:
            # Schritt 1: Inventar reservieren
            self.publish_command('RESERVE_INVENTORY', order_data)

            # Schritt 2: Zahlung verarbeiten
            self.publish_command('PROCESS_PAYMENT', order_data)

            # Schritt 3: Bestellung versenden
            self.publish_command('SHIP_ORDER', order_data)

        except SagaException as e:
            # Kompensieren Sie in umgekehrter Reihenfolge
            self.compensate(e.failed_step)

    def compensate(self, failed_step):
        """Führen Sie Kompensationstransaktionen aus"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }

        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Teststrategien

Lokale Entwicklung mit LocalStack

# Starten Sie LocalStack mit Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Erstellen Sie einen Test-Stream
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Integrationstests

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Testen Sie das Event-Publishing mit gemocktem Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)

    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])

    assert order_id is not None

Leistungsoptimierung

Optimieren Sie die Batch-Größe

def optimize_batch_processing(records, batch_size=500):
    """Verarbeiten Sie Datensätze in optimierten Batches"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Verwenden Sie Connection-Pooling

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

AWS Kinesis Ressourcen:

Verwandte Artikel:

Fazit

AWS Kinesis bietet eine robuste Grundlage für den Aufbau skalierbarer, ereignisgesteuerter Microservices-Architekturen. Durch die Anwendung dieser Muster und Best Practices können Sie Systeme erstellen, die widerstandsfähig, skalierbar und wartbar sind. Beginnen Sie mit einem einzelnen Event-Stream, validieren Sie Ihre Architektur und erweitern Sie schrittweise zu komplexeren Mustern, wenn Ihr System wächst.

Der Schlüssel zum Erfolg liegt darin, Ihre Datenflussanforderungen zu verstehen, den richtigen Kinesis-Dienst für Ihren Anwendungsfall zu wählen und eine ordnungsgemäße Überwachung und Fehlerbehandlung von Anfang an zu implementieren.