Tworzenie mikrousług opartych na zdarzeniach za pomocą AWS Kinesis
Architektura oparta na zdarzeniach z użyciem AWS Kinesis do skalowania
AWS Kinesis stał się fundamentem dla budowania nowoczesnych architektur mikroserwisów opartych na wydarzeniach, umożliwiając przetwarzanie danych w czasie rzeczywistym na dużą skalę z minimalnym nakładem operacyjnym.

Zrozumienie architektury mikroserwisów opartych na wydarzeniach
Architektura oparta na wydarzeniach (EDA) to wzorzec projektowy, w którym usługi komunikują się poprzez wydarzenia zamiast bezpośrednich synchronicznych wywołań. Ten podejście oferuje kilka zalet:
- Lepsze rozdzielanie: Usługi nie muszą znać istnienia jednej drugiej
- Skalowalność: Każda usługa skaluje się niezależnie w zależności od obciążenia
- Odporność: Awarie w jednej usłudze nie przechodzą na inne
- Elastyczność: Nowe usługi mogą być dodawane bez modyfikacji istniejących
AWS Kinesis stanowi fundament implementacji EDA, działając jako rozproszona, trwała strumień wydarzeń, który rozdzielają producentów od konsumentów.
Omówienie AWS Kinesis
AWS oferuje kilka usług Kinesis, każda z nich zaprojektowana dla konkretnych przypadków użycia. Gdy oceniasz rozwiązania przesyłania strumieni, możesz również chcieć rozważyć porównanie RabbitMQ na EKS z SQS dla różnych wzorców komunikacji i konsekwencji kosztowych.
Kinesis Data Streams
Jądro usługi przesyłania strumieni, która rejestruje, przechowuje i przetwarza rekordy danych w czasie rzeczywistym. Data Streams jest idealna dla:
- niestandardowych aplikacji przetwarzających dane w czasie rzeczywistym
- budowania potoków danych z opóźnieniem poniżej sekundy
- przetwarzania milionów wydarzeń na sekundę
- implementacji wzorców źródła wydarzeń
Kinesis Data Firehose
Pełnie zarządzana usługa, która dostarcza przesyłanych danych do miejsc takich jak S3, Redshift, Elasticsearch lub punktów końcowych HTTP. Najlepsza do:
- prostych potoków ETL
- agregacji i archiwizacji dzienników
- analizy w czasie prawie rzeczywistym (minimalne opóźnienie 60 sekund)
- scenariuszy, w których nie potrzebujesz niestandardowej logiki przetwarzania
Kinesis Data Analytics
Przetwarza i analizuje przesyłane dane za pomocą SQL lub Apache Flink. Przypadki użycia obejmują:
- dynamiczne panele
- przetwarzanie ETL w czasie rzeczywistym
- detekcję anomalii w czasie rzeczywistym
- ciągłe generowanie metryk
Wzorce architektoniczne z użyciem Kinesis
1. Wzorzec źródła wydarzeń
Źródło wydarzeń przechowuje wszystkie zmiany w stanie aplikacji jako sekwencję wydarzeń. Kinesis jest idealny do tego. Jeśli potrzebujesz powtórnego przypomnienia podstaw Pythona, sprawdź nasz Python Cheatsheet:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Opublikuj wydarzenie w strumieniu Kinesis"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Przykład: wydarzenie rejestracji użytkownika
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. Wzorzec CQRS (Command Query Responsibility Segregation)
Oddziel operacje odczytu i zapisu, używając Kinesis jako magistrali wydarzeń:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Wzorzec rozdzielania (Fan-Out) z użyciem Lambda
Przetwarzaj wydarzenia z jednego strumienia za pomocą wielu funkcji Lambda. Dla implementacji TypeScript z większą bezpieczeństwem typów, odwiedź nasz TypeScript Cheatsheet:
// Konsumenci Lambda dla powiadomień e-mail
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Inny Lambda dla aktualizacji stanu magazynu
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Najlepsze praktyki w środowisku produkcyjnym
1. Wybór odpowiedniej liczby shardów
Oblicz wymagania shardów na podstawie:
- Wprowadzania: 1 MB/sec lub 1000 rekordów/sec na shard
- Wydawania: 2 MB/sec na shard (standardowi konsumentom) lub 2 MB/sec na konsumenta z ulepszonym rozdzielaniem
def calculate_shards(records_per_second, avg_record_size_kb):
"""Oblicz wymaganą liczbę shardów"""
# Pojemność wprowadzania
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Dodaj bufor
2. Zaimplementuj odpowiednie obsługę błędów
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Wstaw rekord z powtórką o wykładniczym opóźnieniu"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Wykładnicze opóźnienie
continue
raise
3. Użyj ulepszonego rozdzielania dla wielu konsumentów
Ulepszone rozdzielanie zapewnia dedykowaną przepustowość dla każdego konsumenta:
# Zarejestruj konsumenta z ulepszonym rozdzielaniem
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Monitoruj kluczowe metryki
Niezbędne metryki CloudWatch do śledzenia:
IncomingRecords: Liczba rekordów pomyślnie wprowadzonychIncomingBytes: Objętość danych wprowadzonychGetRecords.IteratorAgeMilliseconds: Jak daleko konsumenty są w tylneWriteProvisionedThroughputExceeded: Zdarzenia ograniczania przepustowościReadProvisionedThroughputExceeded: Ograniczanie przepustowości konsumentów
5. Zaimplementuj odpowiednią strategię klucza partycji
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Wygeneruj klucz partycji z równomiernym rozkładem"""
# Użyj spójnego hashowania dla równomiernego rozkładu
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Przykład implementacji w rzeczywistym środowisku
Oto kompletny przykład architektury mikroserwisów przetwarzających zamówienia:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Utwórz zamówienie i opublikuj wydarzenia"""
order_id = self.generate_order_id()
# Opublikuj wydarzenie utworzenia zamówienia
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Opublikuj wydarzenie w strumieniu Kinesis"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Konsumuje wydarzenia zamówienia i aktualizuje stan magazynu"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Aktualizacja bazy danych magazynu
for item in order_data['items']:
# Implementacja tutaj
pass
Strategia migracji z monolitu na mikroserwisy
Faza 1: Wzorzec Strangler Fig
Zacznij od kierowania konkretnych wydarzeń przez Kinesis, jednocześnie zachowując monolit:
- Zidentyfikuj konteksty ograniczone w swoim monolicie
- Utwórz strumienie Kinesis dla wydarzeń międzykontekstowych
- Stopniowo wyodrębnij usługi, które konsumują z tych strumieni
- Zachowaj kompatybilność wsteczną z monolitem
Faza 2: Równoległe przetwarzanie
Uruchom zarówno stare, jak i nowe systemy równolegle:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Zapisz do zarówno starego systemu, jak i strumienia wydarzeń"""
try:
# Zapisz do nowego systemu pierwszy
publish_to_kinesis(kinesis_stream, data)
# Następnie zaktualizuj stary system
legacy_db.update(data)
except Exception as e:
# Zaimplementuj logikę kompensacji
rollback_kinesis_event(kinesis_stream, data)
raise
Faza 3: Pełna migracja
Po uzyskaniu pewności, kieruj wszystkie ruchy przez architekturę opartą na wydarzeniach.
Strategie optymalizacji kosztów
1. Użyj trybu on-demand dla zmieniających się obciążeń
Tryb on-demand (wprowadzony w 2023) automatycznie skaluje się na podstawie ruchu:
# Utwórz strumień z trybem on-demand
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Zaimplementuj agregację danych
Zmniejsz jednostki PUT agregując rekordy:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Agreguj rekordy w celu zmniejszenia kosztów"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Wyślij agregowany rekord
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optymalizuj przechowywanie danych
Domyślny czas przechowywania to 24 godziny. Rozszerz go tylko wtedy, gdy jest to konieczne:
# Ustaw przechowywanie na 7 dni
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Najlepsze praktyki bezpieczeństwa
1. Szyfrowanie danych w spoczynku i w trakcie przesyłania
# Utwórz zaszyfrowany strumień
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Włącz szyfrowanie
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. Polityki IAM z minimalnymi uprawnieniami
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. Punkty końcowe VPC
Zachowaj ruch w sieci AWS. Dla zarządzania infrastrukturą AWS jako kod, rozważ użycie Terraform - zobacz nasz Terraform cheatsheet:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Obserwacja i debugowanie
Śledzenie rozproszone z X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
Zapytania CloudWatch Logs Insights
-- Znajdź wolne czasy przetwarzania
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Śledź stawki błędów
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Zaawansowane wzorce
Wzorzec Saga dla transakcji rozproszonych
Zaimplementuj długotrwałe transakcje między mikroserwisami:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Wykonaj saga z logiką kompensacji"""
try:
# Krok 1: Rezerwacja magazynu
self.publish_command('RESERVE_INVENTORY', order_data)
# Krok 2: Przetwarzanie płatności
self.publish_command('PROCESS_PAYMENT', order_data)
# Krok 3: Wysyłka zamówienia
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Kompensuj w odwrotnej kolejności
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Wykonaj transakcje kompensacyjne"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Strategie testowania
Lokalne rozwijanie z LocalStack
# Uruchom LocalStack z Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Utwórz testowy strumień
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Testy integracyjne
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Testowanie publikowania wydarzeń z wykorzystaniem symulowanego Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Optymalizacja wydajności
Optymalizacja wielkości partii
def optimize_batch_processing(records, batch_size=500):
"""Przetwarzaj rekordy w zoptymalizowanych partiach"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Użyj puli połączeń
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Przydatne linki
Zasoby AWS Kinesis:
- Dokumentacja AWS Kinesis
- AWS Kinesis Data Streams Developer Guide
- Kinesis Client Library (KCL)
- Kalkulator kosztów AWS Kinesis
- Kwoty i limity Kinesis Data Streams
- AWS Architecture Blog - Architektury oparte na wydarzeniach
- Przykłady AWS Samples - Kinesis
Powiązane artykuły:
- Rabbitmq na Eks vs Sqs porównanie kosztów hostingowych
- TypeScript Cheatsheet: Podstawowe pojęcia i najlepsze praktyki
- Python Cheatsheet
- Terraform cheatsheet - przydatne polecenia i przykłady
Podsumowanie
AWS Kinesis oferuje solidną podstawę do budowania skalowalnych, architektur mikroserwisów opartych na wydarzeniach. Przestrzegając tych wzorców i najlepszych praktyk, możesz stworzyć systemy, które są odpornościowe, skalowalne i utrzyjmalne. Zacznij od jednego strumienia wydarzeń, zweryfikuj swoją architekturę, a stopniowo rozszerz ją na bardziej złożone wzorce, gdy Twój system rośnie.
Kluczem do sukcesu jest zrozumienie wymagań przepływu danych, wyboru odpowiedniej usługi Kinesis dla Twojego przypadku użycia i implementacji odpowiedniego monitorowania i obsługi błędów od samego początku.