Tworzenie mikroserwisów opartych na zdarzeniach przy użyciu AWS Kinesis
Architektura oparta na zdarzeniach z AWS Kinesis dla skalowalności
AWS Kinesis stał się fundamentem nowoczesnych architektur mikrousług opartych na zdarzeniach, umożliwiając przetwarzanie danych w czasie rzeczywistym w dużych skalach przy minimalnym nakładzie operacyjnym.

Zrozumienie architektury mikrousług opartych na zdarzeniach
Architektura oparta na zdarzeniach (EDA) to wzorzec projektowy, w którym usługi komunikują się poprzez zdarzenia, a nie poprzez bezpośrednie, synchroniczne wywołania. To podejście oferuje kilka korzyści:
- Luźne sprzężenie: Usługi nie muszą znać o istnieniu innych usług
- Skalowalność: Każda usługa skaluje się niezależnie w zależności od obciążenia
- Odporność: Awaria jednej usługi nie powoduje kaskadowego wpływu na inne
- Elastyczność: Nowe usługi można dodawać bez modyfikowania istniejących
AWS Kinesis stanowi szkielet do wdrożenia EDA, działając jako rozproszony, trwały strumień zdarzeń, który rozdziela producentów od konsumentów.
Aby uzyskać szerszą perspektywę na platformy strumieniowe, zobacz nasz Szybki start z Apache Kafka w celu porównania z rozwiązaniami self-hosted.
Przegląd AWS Kinesis
AWS oferuje kilka usług Kinesis, każda zaprojektowana dla konkretnych przypadków użycia. Przy ocenianiu rozwiązań strumieniowych warto również rozważyć porównanie RabbitMQ na EKS vs SQS w kontekście różnych wzorców komunikacji i implikacji kosztowych.
Kinesis Data Streams
Podstawowa usługa strumieniowa, która przechwytuje, przechowuje i przetwarza rekordy danych w czasie rzeczywistym. Data Streams jest idealna do:
- Aplikacji do niestandardowego przetwarzania w czasie rzeczywistym
- Budowania potoków danych z opóźnieniem poniżej sekundy
- Przetwarzania milionów zdarzeń na sekundę
- Wdrażania wzorców opartych na zdarzeniach (event sourcing)
Kinesis Data Firehose
Cenowo zarządzana usługa dostarczająca strumienie danych do celów takich jak S3, Redshift, Elasticsearch lub punkty końcowe HTTP. Najlepsza do:
- Prosty potok ETL
- Agregacja i archiwizacja logów
- Analiza w czasie quasi-realistycznym (minimalne opóźnienie 60 sekund)
- Scenariuszy, gdzie nie jest potrzebna logika niestandardowego przetwarzania
Kinesis Data Analytics
Przetwarza i analizuje dane strumieniowe używając SQL lub Apache Flink. Przypadki użycia obejmują:
- Dashboards w czasie rzeczywistym
- Strumieniowe ETL
- Wykrywanie anomalii w czasie rzeczywistym
- Ciągłe generowanie metryk
Aby dowiedzieć się więcej o operacjach Flink, zobacz nasz Poradnik Apache Flink na K8s i Kafka.
Wzorce architektoniczne z Kinesis
1. Wzorzec Event Sourcing
Event sourcing przechowuje wszystkie zmiany stanu aplikacji jako sekwencję zdarzeń. Kinesis jest do tego idealny. Jeśli potrzebujesz odświeżenia podstaw Pythona, sprawdź nasz Skrypt wycinkowy Python:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Publikacja zdarzenia w strumieniu Kinesis"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Przykład: Zdarzenie rejestracji użytkownika
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Rozdzielenie operacji odczytu i zapisu używając Kinesis jako busa zdarzeń:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Wzorzec Fan-Out z Lambda
Przetwarzanie zdarzeń z pojedynczego strumienia za pomocą wielu funkcji Lambda. Dla implementacji w TypeScript z większą bezpieczeństwu typów, odwołaj się do naszego Skryptu wycinkowego TypeScript:
// Lambda konsumencka do powiadomień e-mail
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Inna Lambda do aktualizacji stanów magazynowych
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Najlepsze praktyki dla produkcji
1. Wybór odpowiedniej liczby shardów
Oblicz wymagania dotyczące liczby shardów w oparciu o:
- Wejście (Ingress): 1 MB/sec lub 1000 rekordów/sec na shard
- Wyjście (Egress): 2 MB/sec na shard (standardowi konsumenci) lub 2 MB/sec na konsumenta przy użyciu rozszerzonego fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
"""Oblicz wymaganą liczbę shardów"""
# Pojemność wejścia
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Dodaj bufor
2. Właściwe implementowanie obsługi błędów
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Wysyłanie rekordu z wykładniczym cofnięciem przy ponowieniu"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Wykładnicze cofnięcie
continue
raise
3. Użyj rozszerzonego fan-out dla wielu konsumentów
Rozszerzony fan-out zapewnia dedykowaną przepustowość dla każdego konsumenta:
# Zarejestruj konsumenta z rozszerzonym fan-out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Monitoruj kluczowe metryki
Podstawowe metryki CloudWatch do śledzenia:
IncomingRecords: Liczba pomyślnie wysłanych rekordówIncomingBytes: Objętość danych wejściowych w bajtachGetRecords.IteratorAgeMilliseconds: Jak daleko konsumenci są w tyleWriteProvisionedThroughputExceeded: Zdarzenia ograniczania przepustowości zapisuReadProvisionedThroughputExceeded: Ograniczenia przepustowości odczytu konsumenta
5. Właściwa strategia kluczy partition
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generuj klucz partition z równomierną dystrybucją"""
# Użyj spójnego hashowania dla równomiernego rozdziału
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Przykład implementacji w świecie rzeczywistym
Oto kompletny przykład architektury mikrousług do przetwarzania zamówień:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Tworzenie zamówienia i publikacja zdarzeń"""
order_id = self.generate_order_id()
# Publikacja zdarzenia utworzenia zamówienia
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Publikacja zdarzenia w strumieniu Kinesis"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Konsumuje zdarzenia zamówień i aktualizuje stan magazynowy"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Aktualizacja bazy danych stanów magazynowych
for item in order_data['items']:
# Implementacja tutaj
pass
Strategia migracji z monolitu do mikrousług
Faza 1: Wzorzec Strangler Fig
Zacznij od kierowania konkretnych zdarzeń przez Kinesis, zachowując monolit:
- Zidentyfikuj ograniczone konteksty w monolicie
- Stwórz strumienie Kinesis dla zdarzeń między kontekstami
- Stopniowo wydzielaj usługi, które konsumują z tych strumieni
- Zachowaj kompatybilność wsteczną z monolitem
Faza 2: Przetwarzanie równoległe
Uruchom stare i nowe systemy równolegle:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Zapisz do starego systemu i strumienia zdarzeń"""
try:
# Najpierw zapisz do nowego systemu
publish_to_kinesis(kinesis_stream, data)
# Następnie zaktualizuj stary system
legacy_db.update(data)
except Exception as e:
# Zaimplementuj logikę kompensacyjną
rollback_kinesis_event(kinesis_stream, data)
raise
Faza 3: Pełna migracja
Po zbudowaniu zaufania, kieruj cały ruch przez architekturę opartą na zdarzeniach.
Strategie optymalizacji kosztów
Aby uzyskać kompleksowe wytyczne dotyczące wzorców infrastruktury danych, w tym magazynowania obiektów i architektur baz danych, odwołaj się do Infrastruktura danych dla systemów AI: Magazynowanie obiektów, bazy danych, wyszukiwanie i architektura danych AI.
1. Użyj trybu On-Demand dla zmiennych obciążeń
Tryb On-Demand (wprowadzony w 2023) automatycznie skaluje się w zależności od ruchu:
# Stwórz strumień w trybie on-demand
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Zaimplementuj agregację danych
Zmniejsz jednostki payloadu PUT poprzez grupowanie rekordów:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Agreguj rekordy, aby zmniejszyć koszty"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Wyślij zgrupowany rekord
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Zoptymalizuj czas przechowywania danych
Domyślny czas przechowywania to 24 godziny. Wydłuż go tylko w razie potrzeby:
# Ustaw czas przechowywania na 7 dni
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Najlepsze praktyki bezpieczeństwa
1. Szyfrowanie danych w spoczynku i w ruchu
# Stwórz zaszyfrowany strumień
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Włącz szyfrowanie
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. Polityki IAM dla zasady najmniejszych przywilejów
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. Punkty końcowe VPC
Zachowaj ruch w sieci AWS. Do zarządzania infrastrukturą AWS jako kod, rozważ użycie Terraform - zobacz nasz Skrypt wycinkowy Terraform:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Obserwowalność i debugowanie
Śledzenie rozproszone z X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
Zapytania w insights CloudWatch Logs
-- Znajdź wolne czasy przetwarzania
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Śledź wskaźniki błędów
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Zaawansowane wzorce
Wzorzec Saga dla rozproszonych transakcji
Zaimplementuj długotrwałe transakcje w mikrousługach:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Wykonaj sagę z logiką kompensacyjną"""
try:
# Krok 1: Zarezerwuj stan magazynowy
self.publish_command('RESERVE_INVENTORY', order_data)
# Krok 2: Przetwórz płatność
self.publish_command('PROCESS_PAYMENT', order_data)
# Krok 3: Wyslij zamówienie
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Kompensuj w odwrotnej kolejności
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Wykonaj transakcje kompensacyjne"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Strategie testowania
Rozwój lokalny z LocalStack
# Uruchom LocalStack z Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Stwórz strumień testowy
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Testy integracyjne
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Test publikacji zdarzeń z symulowanym Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Dostrojenie wydajności
Optymalizacja rozmiaru partii
def optimize_batch_processing(records, batch_size=500):
"""Przetwarzaj rekordy w zoptymalizowanych partiach"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Użyj puli połączeń
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Przydatne linki
Zasoby AWS Kinesis:
- Dokumentacja AWS Kinesis
- Przewodnik dewelopera AWS Kinesis Data Streams
- Biblioteka klienta Kinesis (KCL)
- Kalkulator cen AWS Kinesis
- Kwoty i limity strumieni Kinesis Data Streams
- Blog architektury AWS - Architektury oparte na zdarzeniach
- Przykłady AWS - Przykłady Kinesis
Powiązane artykuły:
- Porównanie kosztów hostingu RabbitMQ na EKS vs SQS
- Skrypt wycinkowy TypeScript: Podstawowe koncepcje i najlepsze praktyki
- Skrypt wycinkowy Python
Podsumowanie
AWS Kinesis dostarcza solidnej podstawy do budowy skalowalnych architektur mikrousług opartych na zdarzeniach. Stosując się do tych wzorców i najlepszych praktyk, możesz tworzyć systemy, które są odporne, skalowalne i łatwe w utrzymaniu. Zacznij od pojedynczego strumienia zdarzeń, zwaliduj swoją architekturę i stopniowo przechodź do bardziej złożonych wzorców w miarę wzrostu systemu.
Kluczem do sukcesu jest zrozumienie wymagań dotyczących przepływu danych, wybór odpowiedniej usługi Kinesis dla danego przypadku użycia oraz wdrożenie odpowiedniego monitoringu i obsługi błędów od samego początku.