Tworzenie mikrousług opartych na zdarzeniach za pomocą AWS Kinesis

Architektura oparta na zdarzeniach z użyciem AWS Kinesis do skalowania

Page content

AWS Kinesis stał się fundamentem dla budowania nowoczesnych architektur mikroserwisów opartych na wydarzeniach, umożliwiając przetwarzanie danych w czasie rzeczywistym na dużą skalę z minimalnym nakładem operacyjnym.

amazon-kinesis

Zrozumienie architektury mikroserwisów opartych na wydarzeniach

Architektura oparta na wydarzeniach (EDA) to wzorzec projektowy, w którym usługi komunikują się poprzez wydarzenia zamiast bezpośrednich synchronicznych wywołań. Ten podejście oferuje kilka zalet:

  • Lepsze rozdzielanie: Usługi nie muszą znać istnienia jednej drugiej
  • Skalowalność: Każda usługa skaluje się niezależnie w zależności od obciążenia
  • Odporność: Awarie w jednej usłudze nie przechodzą na inne
  • Elastyczność: Nowe usługi mogą być dodawane bez modyfikacji istniejących

AWS Kinesis stanowi fundament implementacji EDA, działając jako rozproszona, trwała strumień wydarzeń, który rozdzielają producentów od konsumentów.

Omówienie AWS Kinesis

AWS oferuje kilka usług Kinesis, każda z nich zaprojektowana dla konkretnych przypadków użycia. Gdy oceniasz rozwiązania przesyłania strumieni, możesz również chcieć rozważyć porównanie RabbitMQ na EKS z SQS dla różnych wzorców komunikacji i konsekwencji kosztowych.

Kinesis Data Streams

Jądro usługi przesyłania strumieni, która rejestruje, przechowuje i przetwarza rekordy danych w czasie rzeczywistym. Data Streams jest idealna dla:

  • niestandardowych aplikacji przetwarzających dane w czasie rzeczywistym
  • budowania potoków danych z opóźnieniem poniżej sekundy
  • przetwarzania milionów wydarzeń na sekundę
  • implementacji wzorców źródła wydarzeń

Kinesis Data Firehose

Pełnie zarządzana usługa, która dostarcza przesyłanych danych do miejsc takich jak S3, Redshift, Elasticsearch lub punktów końcowych HTTP. Najlepsza do:

  • prostych potoków ETL
  • agregacji i archiwizacji dzienników
  • analizy w czasie prawie rzeczywistym (minimalne opóźnienie 60 sekund)
  • scenariuszy, w których nie potrzebujesz niestandardowej logiki przetwarzania

Kinesis Data Analytics

Przetwarza i analizuje przesyłane dane za pomocą SQL lub Apache Flink. Przypadki użycia obejmują:

  • dynamiczne panele
  • przetwarzanie ETL w czasie rzeczywistym
  • detekcję anomalii w czasie rzeczywistym
  • ciągłe generowanie metryk

Wzorce architektoniczne z użyciem Kinesis

1. Wzorzec źródła wydarzeń

Źródło wydarzeń przechowuje wszystkie zmiany w stanie aplikacji jako sekwencję wydarzeń. Kinesis jest idealny do tego. Jeśli potrzebujesz powtórnego przypomnienia podstaw Pythona, sprawdź nasz Python Cheatsheet:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Opublikuj wydarzenie w strumieniu Kinesis"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Przykład: wydarzenie rejestracji użytkownika
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. Wzorzec CQRS (Command Query Responsibility Segregation)

Oddziel operacje odczytu i zapisu, używając Kinesis jako magistrali wydarzeń:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Wzorzec rozdzielania (Fan-Out) z użyciem Lambda

Przetwarzaj wydarzenia z jednego strumienia za pomocą wielu funkcji Lambda. Dla implementacji TypeScript z większą bezpieczeństwem typów, odwiedź nasz TypeScript Cheatsheet:

// Konsumenci Lambda dla powiadomień e-mail
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Inny Lambda dla aktualizacji stanu magazynu
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Najlepsze praktyki w środowisku produkcyjnym

1. Wybór odpowiedniej liczby shardów

Oblicz wymagania shardów na podstawie:

  • Wprowadzania: 1 MB/sec lub 1000 rekordów/sec na shard
  • Wydawania: 2 MB/sec na shard (standardowi konsumentom) lub 2 MB/sec na konsumenta z ulepszonym rozdzielaniem
def calculate_shards(records_per_second, avg_record_size_kb):
    """Oblicz wymaganą liczbę shardów"""
    # Pojemność wprowadzania
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Dodaj bufor

2. Zaimplementuj odpowiednie obsługę błędów

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Wstaw rekord z powtórką o wykładniczym opóźnieniu"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Wykładnicze opóźnienie
                    continue
            raise

3. Użyj ulepszonego rozdzielania dla wielu konsumentów

Ulepszone rozdzielanie zapewnia dedykowaną przepustowość dla każdego konsumenta:

# Zarejestruj konsumenta z ulepszonym rozdzielaniem
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Monitoruj kluczowe metryki

Niezbędne metryki CloudWatch do śledzenia:

  • IncomingRecords: Liczba rekordów pomyślnie wprowadzonych
  • IncomingBytes: Objętość danych wprowadzonych
  • GetRecords.IteratorAgeMilliseconds: Jak daleko konsumenty są w tylne
  • WriteProvisionedThroughputExceeded: Zdarzenia ograniczania przepustowości
  • ReadProvisionedThroughputExceeded: Ograniczanie przepustowości konsumentów

5. Zaimplementuj odpowiednią strategię klucza partycji

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Wygeneruj klucz partycji z równomiernym rozkładem"""
    # Użyj spójnego hashowania dla równomiernego rozkładu
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Przykład implementacji w rzeczywistym środowisku

Oto kompletny przykład architektury mikroserwisów przetwarzających zamówienia:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Utwórz zamówienie i opublikuj wydarzenia"""
        order_id = self.generate_order_id()
        
        # Opublikuj wydarzenie utworzenia zamówienia
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Opublikuj wydarzenie w strumieniu Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Konsumuje wydarzenia zamówienia i aktualizuje stan magazynu"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Aktualizacja bazy danych magazynu
        for item in order_data['items']:
            # Implementacja tutaj
            pass

Strategia migracji z monolitu na mikroserwisy

Faza 1: Wzorzec Strangler Fig

Zacznij od kierowania konkretnych wydarzeń przez Kinesis, jednocześnie zachowując monolit:

  1. Zidentyfikuj konteksty ograniczone w swoim monolicie
  2. Utwórz strumienie Kinesis dla wydarzeń międzykontekstowych
  3. Stopniowo wyodrębnij usługi, które konsumują z tych strumieni
  4. Zachowaj kompatybilność wsteczną z monolitem

Faza 2: Równoległe przetwarzanie

Uruchom zarówno stare, jak i nowe systemy równolegle:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Zapisz do zarówno starego systemu, jak i strumienia wydarzeń"""
    try:
        # Zapisz do nowego systemu pierwszy
        publish_to_kinesis(kinesis_stream, data)
        
        # Następnie zaktualizuj stary system
        legacy_db.update(data)
    except Exception as e:
        # Zaimplementuj logikę kompensacji
        rollback_kinesis_event(kinesis_stream, data)
        raise

Faza 3: Pełna migracja

Po uzyskaniu pewności, kieruj wszystkie ruchy przez architekturę opartą na wydarzeniach.

Strategie optymalizacji kosztów

1. Użyj trybu on-demand dla zmieniających się obciążeń

Tryb on-demand (wprowadzony w 2023) automatycznie skaluje się na podstawie ruchu:

# Utwórz strumień z trybem on-demand
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Zaimplementuj agregację danych

Zmniejsz jednostki PUT agregując rekordy:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Agreguj rekordy w celu zmniejszenia kosztów"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Wyślij agregowany rekord
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optymalizuj przechowywanie danych

Domyślny czas przechowywania to 24 godziny. Rozszerz go tylko wtedy, gdy jest to konieczne:

# Ustaw przechowywanie na 7 dni
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Najlepsze praktyki bezpieczeństwa

1. Szyfrowanie danych w spoczynku i w trakcie przesyłania

# Utwórz zaszyfrowany strumień
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Włącz szyfrowanie
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Polityki IAM z minimalnymi uprawnieniami

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. Punkty końcowe VPC

Zachowaj ruch w sieci AWS. Dla zarządzania infrastrukturą AWS jako kod, rozważ użycie Terraform - zobacz nasz Terraform cheatsheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Obserwacja i debugowanie

Śledzenie rozproszone z X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Zapytania CloudWatch Logs Insights

-- Znajdź wolne czasy przetwarzania
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Śledź stawki błędów
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Zaawansowane wzorce

Wzorzec Saga dla transakcji rozproszonych

Zaimplementuj długotrwałe transakcje między mikroserwisami:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Wykonaj saga z logiką kompensacji"""
        try:
            # Krok 1: Rezerwacja magazynu
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Krok 2: Przetwarzanie płatności
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Krok 3: Wysyłka zamówienia
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Kompensuj w odwrotnej kolejności
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Wykonaj transakcje kompensacyjne"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Strategie testowania

Lokalne rozwijanie z LocalStack

# Uruchom LocalStack z Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Utwórz testowy strumień
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Testy integracyjne

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Testowanie publikowania wydarzeń z wykorzystaniem symulowanego Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Optymalizacja wydajności

Optymalizacja wielkości partii

def optimize_batch_processing(records, batch_size=500):
    """Przetwarzaj rekordy w zoptymalizowanych partiach"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Użyj puli połączeń

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Przydatne linki

Zasoby AWS Kinesis:

Powiązane artykuły:

Podsumowanie

AWS Kinesis oferuje solidną podstawę do budowania skalowalnych, architektur mikroserwisów opartych na wydarzeniach. Przestrzegając tych wzorców i najlepszych praktyk, możesz stworzyć systemy, które są odpornościowe, skalowalne i utrzyjmalne. Zacznij od jednego strumienia wydarzeń, zweryfikuj swoją architekturę, a stopniowo rozszerz ją na bardziej złożone wzorce, gdy Twój system rośnie.

Kluczem do sukcesu jest zrozumienie wymagań przepływu danych, wyboru odpowiedniej usługi Kinesis dla Twojego przypadku użycia i implementacji odpowiedniego monitorowania i obsługi błędów od samego początku.