Tworzenie mikroserwisów opartych na zdarzeniach przy użyciu AWS Kinesis

Architektura oparta na zdarzeniach z AWS Kinesis dla skalowalności

Page content

AWS Kinesis stał się fundamentem nowoczesnych architektur mikrousług opartych na zdarzeniach, umożliwiając przetwarzanie danych w czasie rzeczywistym w dużych skalach przy minimalnym nakładzie operacyjnym.

amazon-kinesis

Zrozumienie architektury mikrousług opartych na zdarzeniach

Architektura oparta na zdarzeniach (EDA) to wzorzec projektowy, w którym usługi komunikują się poprzez zdarzenia, a nie poprzez bezpośrednie, synchroniczne wywołania. To podejście oferuje kilka korzyści:

  • Luźne sprzężenie: Usługi nie muszą znać o istnieniu innych usług
  • Skalowalność: Każda usługa skaluje się niezależnie w zależności od obciążenia
  • Odporność: Awaria jednej usługi nie powoduje kaskadowego wpływu na inne
  • Elastyczność: Nowe usługi można dodawać bez modyfikowania istniejących

AWS Kinesis stanowi szkielet do wdrożenia EDA, działając jako rozproszony, trwały strumień zdarzeń, który rozdziela producentów od konsumentów.

Aby uzyskać szerszą perspektywę na platformy strumieniowe, zobacz nasz Szybki start z Apache Kafka w celu porównania z rozwiązaniami self-hosted.

Przegląd AWS Kinesis

AWS oferuje kilka usług Kinesis, każda zaprojektowana dla konkretnych przypadków użycia. Przy ocenianiu rozwiązań strumieniowych warto również rozważyć porównanie RabbitMQ na EKS vs SQS w kontekście różnych wzorców komunikacji i implikacji kosztowych.

Kinesis Data Streams

Podstawowa usługa strumieniowa, która przechwytuje, przechowuje i przetwarza rekordy danych w czasie rzeczywistym. Data Streams jest idealna do:

  • Aplikacji do niestandardowego przetwarzania w czasie rzeczywistym
  • Budowania potoków danych z opóźnieniem poniżej sekundy
  • Przetwarzania milionów zdarzeń na sekundę
  • Wdrażania wzorców opartych na zdarzeniach (event sourcing)

Kinesis Data Firehose

Cenowo zarządzana usługa dostarczająca strumienie danych do celów takich jak S3, Redshift, Elasticsearch lub punkty końcowe HTTP. Najlepsza do:

  • Prosty potok ETL
  • Agregacja i archiwizacja logów
  • Analiza w czasie quasi-realistycznym (minimalne opóźnienie 60 sekund)
  • Scenariuszy, gdzie nie jest potrzebna logika niestandardowego przetwarzania

Kinesis Data Analytics

Przetwarza i analizuje dane strumieniowe używając SQL lub Apache Flink. Przypadki użycia obejmują:

  • Dashboards w czasie rzeczywistym
  • Strumieniowe ETL
  • Wykrywanie anomalii w czasie rzeczywistym
  • Ciągłe generowanie metryk

Aby dowiedzieć się więcej o operacjach Flink, zobacz nasz Poradnik Apache Flink na K8s i Kafka.

Wzorce architektoniczne z Kinesis

1. Wzorzec Event Sourcing

Event sourcing przechowuje wszystkie zmiany stanu aplikacji jako sekwencję zdarzeń. Kinesis jest do tego idealny. Jeśli potrzebujesz odświeżenia podstaw Pythona, sprawdź nasz Skrypt wycinkowy Python:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publikacja zdarzenia w strumieniu Kinesis"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Przykład: Zdarzenie rejestracji użytkownika
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Rozdzielenie operacji odczytu i zapisu używając Kinesis jako busa zdarzeń:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Wzorzec Fan-Out z Lambda

Przetwarzanie zdarzeń z pojedynczego strumienia za pomocą wielu funkcji Lambda. Dla implementacji w TypeScript z większą bezpieczeństwu typów, odwołaj się do naszego Skryptu wycinkowego TypeScript:

// Lambda konsumencka do powiadomień e-mail
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Inna Lambda do aktualizacji stanów magazynowych
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Najlepsze praktyki dla produkcji

1. Wybór odpowiedniej liczby shardów

Oblicz wymagania dotyczące liczby shardów w oparciu o:

  • Wejście (Ingress): 1 MB/sec lub 1000 rekordów/sec na shard
  • Wyjście (Egress): 2 MB/sec na shard (standardowi konsumenci) lub 2 MB/sec na konsumenta przy użyciu rozszerzonego fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Oblicz wymaganą liczbę shardów"""
    # Pojemność wejścia
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Dodaj bufor

2. Właściwe implementowanie obsługi błędów

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Wysyłanie rekordu z wykładniczym cofnięciem przy ponowieniu"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Wykładnicze cofnięcie
                    continue
            raise

3. Użyj rozszerzonego fan-out dla wielu konsumentów

Rozszerzony fan-out zapewnia dedykowaną przepustowość dla każdego konsumenta:

# Zarejestruj konsumenta z rozszerzonym fan-out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Monitoruj kluczowe metryki

Podstawowe metryki CloudWatch do śledzenia:

  • IncomingRecords: Liczba pomyślnie wysłanych rekordów
  • IncomingBytes: Objętość danych wejściowych w bajtach
  • GetRecords.IteratorAgeMilliseconds: Jak daleko konsumenci są w tyle
  • WriteProvisionedThroughputExceeded: Zdarzenia ograniczania przepustowości zapisu
  • ReadProvisionedThroughputExceeded: Ograniczenia przepustowości odczytu konsumenta

5. Właściwa strategia kluczy partition

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generuj klucz partition z równomierną dystrybucją"""
    # Użyj spójnego hashowania dla równomiernego rozdziału
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Przykład implementacji w świecie rzeczywistym

Oto kompletny przykład architektury mikrousług do przetwarzania zamówień:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Tworzenie zamówienia i publikacja zdarzeń"""
        order_id = self.generate_order_id()
        
        # Publikacja zdarzenia utworzenia zamówienia
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publikacja zdarzenia w strumieniu Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Konsumuje zdarzenia zamówień i aktualizuje stan magazynowy"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Aktualizacja bazy danych stanów magazynowych
        for item in order_data['items']:
            # Implementacja tutaj
            pass

Strategia migracji z monolitu do mikrousług

Faza 1: Wzorzec Strangler Fig

Zacznij od kierowania konkretnych zdarzeń przez Kinesis, zachowując monolit:

  1. Zidentyfikuj ograniczone konteksty w monolicie
  2. Stwórz strumienie Kinesis dla zdarzeń między kontekstami
  3. Stopniowo wydzielaj usługi, które konsumują z tych strumieni
  4. Zachowaj kompatybilność wsteczną z monolitem

Faza 2: Przetwarzanie równoległe

Uruchom stare i nowe systemy równolegle:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Zapisz do starego systemu i strumienia zdarzeń"""
    try:
        # Najpierw zapisz do nowego systemu
        publish_to_kinesis(kinesis_stream, data)
        
        # Następnie zaktualizuj stary system
        legacy_db.update(data)
    except Exception as e:
        # Zaimplementuj logikę kompensacyjną
        rollback_kinesis_event(kinesis_stream, data)
        raise

Faza 3: Pełna migracja

Po zbudowaniu zaufania, kieruj cały ruch przez architekturę opartą na zdarzeniach.

Strategie optymalizacji kosztów

Aby uzyskać kompleksowe wytyczne dotyczące wzorców infrastruktury danych, w tym magazynowania obiektów i architektur baz danych, odwołaj się do Infrastruktura danych dla systemów AI: Magazynowanie obiektów, bazy danych, wyszukiwanie i architektura danych AI.

1. Użyj trybu On-Demand dla zmiennych obciążeń

Tryb On-Demand (wprowadzony w 2023) automatycznie skaluje się w zależności od ruchu:

# Stwórz strumień w trybie on-demand
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Zaimplementuj agregację danych

Zmniejsz jednostki payloadu PUT poprzez grupowanie rekordów:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Agreguj rekordy, aby zmniejszyć koszty"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Wyślij zgrupowany rekord
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Zoptymalizuj czas przechowywania danych

Domyślny czas przechowywania to 24 godziny. Wydłuż go tylko w razie potrzeby:

# Ustaw czas przechowywania na 7 dni
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Najlepsze praktyki bezpieczeństwa

1. Szyfrowanie danych w spoczynku i w ruchu

# Stwórz zaszyfrowany strumień
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Włącz szyfrowanie
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Polityki IAM dla zasady najmniejszych przywilejów

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. Punkty końcowe VPC

Zachowaj ruch w sieci AWS. Do zarządzania infrastrukturą AWS jako kod, rozważ użycie Terraform - zobacz nasz Skrypt wycinkowy Terraform:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Obserwowalność i debugowanie

Śledzenie rozproszone z X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Zapytania w insights CloudWatch Logs

-- Znajdź wolne czasy przetwarzania
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Śledź wskaźniki błędów
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Zaawansowane wzorce

Wzorzec Saga dla rozproszonych transakcji

Zaimplementuj długotrwałe transakcje w mikrousługach:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Wykonaj sagę z logiką kompensacyjną"""
        try:
            # Krok 1: Zarezerwuj stan magazynowy
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Krok 2: Przetwórz płatność
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Krok 3: Wyslij zamówienie
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Kompensuj w odwrotnej kolejności
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Wykonaj transakcje kompensacyjne"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Strategie testowania

Rozwój lokalny z LocalStack

# Uruchom LocalStack z Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Stwórz strumień testowy
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Testy integracyjne

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Test publikacji zdarzeń z symulowanym Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Dostrojenie wydajności

Optymalizacja rozmiaru partii

def optimize_batch_processing(records, batch_size=500):
    """Przetwarzaj rekordy w zoptymalizowanych partiach"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Użyj puli połączeń

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Przydatne linki

Zasoby AWS Kinesis:

Powiązane artykuły:

Podsumowanie

AWS Kinesis dostarcza solidnej podstawy do budowy skalowalnych architektur mikrousług opartych na zdarzeniach. Stosując się do tych wzorców i najlepszych praktyk, możesz tworzyć systemy, które są odporne, skalowalne i łatwe w utrzymaniu. Zacznij od pojedynczego strumienia zdarzeń, zwaliduj swoją architekturę i stopniowo przechodź do bardziej złożonych wzorców w miarę wzrostu systemu.

Kluczem do sukcesu jest zrozumienie wymagań dotyczących przepływu danych, wybór odpowiedniej usługi Kinesis dla danego przypadku użycia oraz wdrożenie odpowiedniego monitoringu i obsługi błędów od samego początku.