Aufbau von ereignisgesteuerten Mikroservices mit AWS Kinesis
Ereignisgesteuerte Architektur mit AWS Kinesis für Skalierbarkeit
AWS Kinesis ist zu einem Eckpfeiler für den Aufbau moderner ereignisgesteuerter Mikroservice-Architekturen geworden, die eine Echtzeit-Datenverarbeitung im großen Maßstab mit minimalem Betriebsaufwand ermöglichen.

Verständnis der ereignisgesteuerten Mikroservice-Architektur
Die ereignisgesteuerte Architektur (EDA) ist ein Designmuster, bei dem Dienste über Ereignisse statt über direkte synchrone Aufrufe kommunizieren. Dieser Ansatz bietet mehrere Vorteile:
- Lockere Kopplung: Dienste müssen nicht voneinander wissen
- Skalierbarkeit: Jeder Dienst skaliert unabhängig basierend auf seiner Arbeitslast
- Widerstandsfähigkeit: Ausfälle in einem Dienst breiten sich nicht auf andere aus
- Flexibilität: Neue Dienste können hinzugefügt werden, ohne bestehende zu modifizieren
AWS Kinesis bildet das Rückgrat für die Implementierung von EDA, indem es als verteilte, dauerhafte Ereignisstrom dient, der Produzenten von Konsumenten entkoppelt.
AWS Kinesis Überblick
AWS bietet mehrere Kinesis-Dienste, die jeweils für spezifische Anwendungsfälle konzipiert sind. Bei der Bewertung von Streaming-Lösungen möchten Sie möglicherweise auch die Vergleich von RabbitMQ auf EKS vs SQS für verschiedene Nachrichtenmuster und Kostenimplikationen berücksichtigen.
Kinesis Data Streams
Der Kern-Streaming-Dienst, der Datenaufzeichnungen in Echtzeit erfasst, speichert und verarbeitet. Data Streams ist ideal für:
- Benutzerdefinierte Echtzeitverarbeitungsanwendungen
- Aufbau von Datenpipelines mit Sub-Sekunden-Latenz
- Verarbeitung von Millionen von Ereignissen pro Sekunde
- Implementierung von Event-Sourcing-Mustern
Kinesis Data Firehose
Ein vollständig verwalteter Dienst, der Streaming-Daten an Ziele wie S3, Redshift, Elasticsearch oder HTTP-Endpunkte liefert. Am besten geeignet für:
- Einfache ETL-Pipelines
- Log-Aggregation und Archivierung
- Nahezu Echtzeit-Analysen (60-Sekunden-Mindestlatenz)
- Szenarien, bei denen Sie keine benutzerdefinierte Verarbeitungslogik benötigen
Kinesis Data Analytics
Verarbeitet und analysiert Streaming-Daten mit SQL oder Apache Flink. Anwendungsfälle umfassen:
- Echtzeit-Dashboards
- Streaming-ETL
- Echtzeit-Anomalieerkennung
- Kontinuierliche Metrikerzeugung
Architektur-Muster mit Kinesis
1. Event-Sourcing-Muster
Event-Sourcing speichert alle Änderungen des Anwendungszustands als eine Sequenz von Ereignissen. Kinesis ist perfekt dafür. Wenn Sie eine Auffrischung der Python-Grundlagen benötigen, werfen Sie einen Blick auf unseren Python Cheatsheet:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Veröffentliche ein Ereignis im Kinesis-Stream"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Beispiel: Benutzerregistrierungs-Ereignis
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Trennen Sie Lese- und Schreiboperationen mit Kinesis als Event-Bus:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Fan-Out-Muster mit Lambda
Verarbeiten Sie Ereignisse aus einem einzelnen Stream mit mehreren Lambda-Funktionen. Für TypeScript-Implementierungen mit stärkerer Typensicherheit beziehen Sie sich auf unseren TypeScript Cheatsheet:
// Lambda-Konsument für E-Mail-Benachrichtigungen
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Eine weitere Lambda für Lagerbestandsaktualisierungen
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Best Practices für die Produktion
1. Wahl der richtigen Shard-Anzahl
Berechnen Sie Ihre Shard-Anforderungen basierend auf:
- Ingress: 1 MB/sec oder 1.000 Aufzeichnungen/sec pro Shard
- Egress: 2 MB/sec pro Shard (Standard-Konsumenten) oder 2 MB/sec pro Konsument mit Enhanced Fan-Out
def calculate_shards(records_per_second, avg_record_size_kb):
"""Berechne die erforderliche Anzahl von Shards"""
# Ingress-Kapazität
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Puffer hinzufügen
2. Implementieren Sie eine ordnungsgemäße Fehlerbehandlung
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Put record mit exponentieller Backoff-Wiederholung"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponentieller Backoff
continue
raise
3. Verwenden Sie Enhanced Fan-Out für mehrere Konsumenten
Enhanced Fan-Out bietet dedizierte Durchsatzkapazität für jeden Konsumenten:
# Registrieren Sie einen Konsumenten mit Enhanced Fan-Out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Überwachen Sie wichtige Metriken
Wichtige CloudWatch-Metriken zur Überwachung:
IncomingRecords: Anzahl der erfolgreich gesetzten AufzeichnungenIncomingBytes: Byte-Volumen der eingehenden DatenGetRecords.IteratorAgeMilliseconds: Wie weit hinter den Konsumenten sie sindWriteProvisionedThroughputExceeded: Throttling-EreignisseReadProvisionedThroughputExceeded: Konsumenten-Throttling
5. Implementieren Sie eine ordnungsgemäße Partition-Key-Strategie
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generiere Partition-Key mit gleichmäßiger Verteilung"""
# Verwenden Sie konsistente Hashing für gleichmäßige Verteilung
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Beispiel für eine reale Implementierung
Hier ist ein vollständiges Beispiel einer Bestellverarbeitungs-Mikroservice-Architektur:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Erstelle Bestellung und veröffentliche Ereignisse"""
order_id = self.generate_order_id()
# Veröffentliche Bestellung erstellt Ereignis
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Veröffentliche Ereignis im Kinesis-Stream"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Verbraucht Bestellereignisse und aktualisiert den Lagerbestand"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Aktualisiere Lagerbestandsdatenbank
for item in order_data['items']:
# Implementierung hier
pass
Migrationsstrategie von Monolith zu Microservices
Phase 1: Strangler Fig Pattern
Beginnen Sie mit dem Routing spezifischer Ereignisse über Kinesis, während der Monolith beibehalten wird:
- Identifizieren Sie begrenzte Kontexte in Ihrem Monolith
- Erstellen Sie Kinesis-Streams für ereignisse zwischen Kontexten
- Extrahieren Sie schrittweise Dienste, die diese Streams konsumieren
- Halten Sie die Abwärtskompatibilität mit dem Monolith aufrecht
Phase 2: Parallelverarbeitung
Führen Sie sowohl das alte als auch das neue System parallel aus:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Schreiben in sowohl das Legacy-System als auch den Event-Stream"""
try:
# Zuerst in das neue System schreiben
publish_to_kinesis(kinesis_stream, data)
# Dann das Legacy-System aktualisieren
legacy_db.update(data)
except Exception as e:
# Implementieren Sie Kompensationslogik
rollback_kinesis_event(kinesis_stream, data)
raise
Phase 3: Vollständige Migration
Sobald das Vertrauen hergestellt ist, leiten Sie den gesamten Verkehr über die ereignisgesteuerte Architektur weiter.
Kostenoptimierungsstrategien
1. Verwenden Sie On-Demand-Modus für variable Arbeitslasten
On-Demand-Modus (eingeführt 2023) skaliert automatisch basierend auf dem Verkehr:
# Erstellen Sie einen Stream mit On-Demand-Modus
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implementieren Sie Datenaggregation
Reduzieren Sie PUT-Payload-Einheiten durch das Batching von Datensätzen:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Aggregieren Sie Datensätze, um Kosten zu reduzieren"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Senden Sie den aggregierten Datensatz
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimieren Sie die Datenaufbewahrung
Die Standardaufbewahrungsdauer beträgt 24 Stunden. Verlängern Sie sie nur, wenn notwendig:
# Setzen Sie die Aufbewahrungsdauer auf 7 Tage
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Sicherheits-Best Practices
1. Verschlüsselung bei Ruhe und im Transit
# Erstellen Sie einen verschlüsselten Stream
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Aktivieren Sie die Verschlüsselung
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. IAM-Richtlinien für das Prinzip des geringsten Privilegs
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC-Endpunkte
Halten Sie den Verkehr innerhalb des AWS-Netzwerks. Für das Verwalten von AWS-Infrastruktur als Code können Sie Terraform verwenden - siehe unseren Terraform-Cheat-Sheet:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Beobachtbarkeit und Debugging
Verteilte Nachverfolgung mit X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights Abfragen
-- Finden Sie langsame Verarbeitungszeiten
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Verfolgen Sie Fehlerraten
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Fortgeschrittene Muster
Saga-Muster für verteilte Transaktionen
Implementieren Sie langlaufende Transaktionen über Microservices:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Führen Sie die Saga mit Kompensationslogik aus"""
try:
# Schritt 1: Inventar reservieren
self.publish_command('RESERVE_INVENTORY', order_data)
# Schritt 2: Zahlung verarbeiten
self.publish_command('PROCESS_PAYMENT', order_data)
# Schritt 3: Bestellung versenden
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Kompensieren Sie in umgekehrter Reihenfolge
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Führen Sie Kompensationstransaktionen aus"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Teststrategien
Lokale Entwicklung mit LocalStack
# Starten Sie LocalStack mit Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Erstellen Sie einen Test-Stream
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Integrationstests
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Testen Sie das Event-Publishing mit gemocktem Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Leistungsoptimierung
Optimieren Sie die Batch-Größe
def optimize_batch_processing(records, batch_size=500):
"""Verarbeiten Sie Datensätze in optimierten Batches"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Verwenden Sie Connection-Pooling
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Nützliche Links
AWS Kinesis Ressourcen:
- AWS Kinesis Dokumentation
- AWS Kinesis Data Streams Entwicklerhandbuch
- Kinesis Client Library (KCL)
- AWS Kinesis Preisrechner
- Kinesis Data Streams Quoten und Limits
- AWS Architecture Blog - Event-getriebene Architekturen
- AWS Samples - Kinesis Beispiele
Verwandte Artikel:
- Rabbitmq auf Eks vs Sqs Hosting-Kostenvergleich
- TypeScript-Cheat-Sheet: Kernkonzepte & Best Practices
- Python-Cheat-Sheet
- Terraform-Cheat-Sheet - nützliche Befehle und Beispiele
Fazit
AWS Kinesis bietet eine robuste Grundlage für den Aufbau skalierbarer, ereignisgesteuerter Microservices-Architekturen. Durch die Anwendung dieser Muster und Best Practices können Sie Systeme erstellen, die widerstandsfähig, skalierbar und wartbar sind. Beginnen Sie mit einem einzelnen Event-Stream, validieren Sie Ihre Architektur und erweitern Sie schrittweise zu komplexeren Mustern, wenn Ihr System wächst.
Der Schlüssel zum Erfolg liegt darin, Ihre Datenflussanforderungen zu verstehen, den richtigen Kinesis-Dienst für Ihren Anwendungsfall zu wählen und eine ordnungsgemäße Überwachung und Fehlerbehandlung von Anfang an zu implementieren.