Het bouwen van gebeurtenisgestuurde microservices met AWS Kinesis
Gebeurtenisgestuurde architectuur met AWS Kinesis voor schaalbaarheid
AWS Kinesis is geworden een kernstuk bij het bouwen van moderne, gebeurtenisgestuurde microservicesarchitecturen, waarmee real-time dataverwerking op schaal mogelijk is met minimale operationele overhead.

Begrijpen van gebeurtenisgestuurde microservicesarchitectuur
Een gebeurtenisgestuurde architectuur (EDA) is een ontwerppatroon waarbij diensten communiceren via gebeurtenissen in plaats van directe synchrone oproepen. Dit aanpak biedt verschillende voordelen:
- Loskoppeling: Diensten hoeven niet te weten van elkaars bestaan
- Schaalbaarheid: Elke dienst schaalt onafhankelijk op basis van zijn werkbelasting
- Resilientie: Fouten in één dienst verspreiden zich niet naar andere diensten
- Flexibiliteit: Nieuwe diensten kunnen worden toegevoegd zonder bestaande diensten te wijzigen
AWS Kinesis biedt de onderbouwing voor het implementeren van EDA door te fungeren als een gedistribueerde, duurzame gebeurtenisstroom die producenten van consumptieerscheidt.
Overzicht van AWS Kinesis
AWS biedt verschillende Kinesis-diensten, elk ontworpen voor specifieke toepassingen. Bij het beoordelen van streamingoplossingen, zou je ook willen overwegen RabbitMQ op EKS vs SQS vergelijken voor verschillende berichtpatronen en kostimplicaties.
Kinesis Data Streams
Het kernstreamingsdienst dat gegevensrecords opslaat en verwerkt in real-time. Data Streams is ideaal voor:
- Aangepaste real-time verwerkingsapplicaties
- Bouwen van datapijplijnen met subseconde latentie
- Verwerken van miljoenen gebeurtenissen per seconde
- Implementatie van gebeurtenisregistratiepatronen
Kinesis Data Firehose
Een volledig beheerde dienst die streamdata levert naar bestemmingen zoals S3, Redshift, Elasticsearch of HTTP-eindpunten. Beste voor:
- Eenvoudige ETL-pijplijnen
- Logaggregatie en archivering
- Bijna real-time analyse (minimale latentie van 60 seconden)
- Scenario’s waarin je geen aangepaste verwerkingslogica nodig hebt
Kinesis Data Analytics
Verwerkt en analyseert streamdata met behulp van SQL of Apache Flink. Toepassingen omvatten:
- Real-time dashboards
- Streaming ETL
- Real-time anomalie detectie
- Continue metriekgeneratie
Architectuurpatronen met Kinesis
1. Gebeurtenisregistratiepatroon
Gebeurtenisregistratie slaat alle veranderingen in de toepassingsstatus op als een reeks gebeurtenissen. Kinesis is perfect voor dit. Als je een herverversing van Python-fundamenten nodig hebt, bekijk dan onze Python Cheat Sheet:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Gebeurtenis publiceren naar Kinesis stream"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Voorbeeld: Gebruikersregistratiegebeurtenis
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Scheid lees- en schrijfopties met Kinesis als gebeurtenisbus:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Fan-Outpatroon met Lambda
Verwerk gebeurtenissen van één stream met meerdere Lambda-functies. Voor TypeScript-implementaties met sterke typesicherheid, verwijzen naar onze TypeScript Cheat Sheet:
// Lambda-consumer voor e-mailmeldingen
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Een andere Lambda voor voorraadupdates
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Beste praktijken voor productie
1. Kiezen van het juiste aantal shards
Bereken je shardvereisten op basis van:
- Invoer: 1 MB/sec of 1.000 records/sec per shard
- Uitvoer: 2 MB/sec per shard (standaard consumptie) of 2 MB/sec per consumptie met uitgebreid fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
"""Bereken het vereiste aantal shards"""
# Invoercapaciteit
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Voeg buffer toe
2. Implementeer juiste foutafhandeling
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Put record met exponentiële backoff retry"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponentiële backoff
continue
raise
3. Gebruik uitgebreid fan-out voor meerdere consumptieers
Uitgebreid fan-out biedt toegewezen doorstroming voor elke consumptieer:
# Registreer een consumptieer met uitgebreid fan-out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Monitor belangrijke metrieken
Essentiële CloudWatch-metrieken om te volgen:
IncomingRecords: Aantal records succesvol ingevoegdIncomingBytes: Bytevolume van ingekomen dataGetRecords.IteratorAgeMilliseconds: Hoe ver consumptieers achter zijnWriteProvisionedThroughputExceeded: ThrottlinggebeurtenissenReadProvisionedThroughputExceeded: Consumptieerthrottling
5. Implementeer juiste partitiekeystrategie
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Genereer partitiekey met gelijke verdeling"""
# Gebruik consistente hashing voor gelijke verdeling
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Voorbeeld van real-world implementatie
Hier is een volledig voorbeeld van een orderverwerkingsmicroservicesarchitectuur:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Order aanmaken en gebeurtenissen publiceren"""
order_id = self.generate_order_id()
# Gebeurtenis voor order aangemaakt publiceren
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Gebeurtenis publiceren naar Kinesis stream"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Gebruikt ordergebeurtenissen en bijwerkt voorraad"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Voorraaddatabase bijwerken
for item in order_data['items']:
# Implementatie hier
pass
Migratiestrategie van monolith naar microservices
Fase 1: Strangler Fig Patroon
Begin met het routeren van specifieke gebeurtenissen via Kinesis terwijl de monolith behouden blijft:
- Identificeer begrenste contexten in je monolith
- Maak Kinesisstreams voor cross-context gebeurtenissen
- Trek geleidelijk diensten af die van deze streams consumeren
- Behoud achterwaartse compatibiliteit met de monolith
Fase 2: Parallel verwerking
Zowel oude als nieuwe systemen uitvoeren parallel:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Schrijf naar zowel legacy systeem als eventstream"""
try:
# Eerst schrijven naar nieuw systeem
publish_to_kinesis(kinesis_stream, data)
# Dan legacy systeem bijwerken
legacy_db.update(data)
except Exception as e:
# Compensatie logica implementeren
rollback_kinesis_event(kinesis_stream, data)
raise
Fase 3: Volledige migratie
Zodra vertrouwen is opgebouwd, routeer alle verkeer via de gebeurtenisgestuurde architectuur.
Kostoptimalisatiestrategieën
1. Gebruik On-Demand-modus voor variabele werkbelastingen
On-demand-modus (geïntroduceerd in 2023) schaalt automatisch op basis van verkeer:
# Stream aanmaken met on-demand modus
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implementeer dataaggregatie
Verlaag PUT payload units door records te batchen:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Aggregateer records om kosten te verminderen"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Aggegregeerde record verzenden
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimaliseer dataopslag
Standaard opslagduur is 24 uur. Pas deze alleen aan als nodig:
# Opslagduur instellen op 7 dagen
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Beveiligingsbeste praktijken
1. Versleuteling opslag en in transitie
# Versleutelde stream aanmaken
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Versleuteling inschakelen
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. IAM-beleid voor minimale bevoegdheid
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC-eindpunten
Houd verkeer binnen het AWS-netwerk. Voor het beheren van AWS-infrastructuur als code, overweeg het gebruik van Terraform - zie onze Terraform cheat sheet:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observabiliteit en foutopsporing
Gedistribueerde traceren met X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights Queries
-- Langzaam verwerkingsduur vinden
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Foutpercentages volgen
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Geavanceerde patronen
Saga Patroon voor gedistribueerde transacties
Implementeer lange transacties over microservices:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Saga uitvoeren met compensatie logica"""
try:
# Stap 1: Voorraad reserveren
self.publish_command('RESERVE_INVENTORY', order_data)
# Stap 2: Betaling verwerken
self.publish_command('PROCESS_PAYMENT', order_data)
# Stap 3: Order verzenden
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Compensatie in omgekeerde volgorde
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Compensatie transacties uitvoeren"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Teststrategieën
Lokale ontwikkeling met LocalStack
# LocalStack starten met Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Teststream aanmaken
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Integratietesten
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Test event publishing met gemockte Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Prestatieoptimalisatie
Optimaliseer batchgrootte
def optimize_batch_processing(records, batch_size=500):
"""Batchverwerking optimaliseren"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Gebruik verbindingspoolen
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Nuttige links
AWS Kinesis bronnen:
- AWS Kinesis Documentatie
- AWS Kinesis Data Streams Developer Guide
- Kinesis Client Library (KCL)
- AWS Kinesis Prijsrekenmachine
- Kinesis Data Streams Quotas en Limieten
- AWS Architectuur Blog - Gebeurtenisgestuurde Architecturen
- AWS Voorbeelden - Kinesis Voorbeelden
Gerelateerde artikelen:
- Rabbitmq op Eks vs Sqs hostingkostvergelijking
- TypeScript Cheat Sheet: Kernconcepten & Beste Praktijken
- Python Cheat Sheet
- Terraform cheat sheet - nuttige commando’s en voorbeelden
Conclusie
AWS Kinesis biedt een robuuste basis voor het bouwen van schaalbare, gebeurtenisgestuurde microservicesarchitecturen. Door deze patronen en beste praktijken te volgen, kun je systemen creëren die robuust, schaalbaar en onderhoudbaar zijn. Begin klein met één gebeurtenisstroom, valideer je architectuur en breid geleidelijk uit naar complexere patronen naarmate je systeem groeit.
Het sleutel tot succes is het begrijpen van je dataflowvereisten, het kiezen van de juiste Kinesisdienst voor je toepassing en het implementeren van juiste monitoring en foutafhandeling vanaf het begin.