Bygga händelsestyrda mikrotjänster med AWS Kinesis

Händelsestyrd arkitektur med AWS Kinesis för skalbarhet

Sidinnehåll

AWS Kinesis har blivit en grundsten för att bygga moderna händelsestyda mikrotjänstarkitekturer, vilket möjliggör realtidsbehandling av data i stor skala med minimal driftsöverhead.

amazon-kinesis

Förståelse för händelsestyda mikrotjänstarkitekturer

Händelsestyda arkitektur (EDA) är ett designmönster där tjänster kommunicerar genom händelser istället för direkta synkrona anrop. Denna tillvägagångssätt erbjuder flera fördelar:

  • Lös koppling: Tjänster behöver inte känna till varandras existens
  • Skalbarhet: Varje tjänst skalas oberoende baserat på sin arbetsbelastning
  • Robusthet: Fel i en tjänst sprids inte till andra
  • Flexibilitet: Nya tjänster kan läggas till utan att modifiera befintliga

AWS Kinesis tillhandahåller ryggraden för att implementera EDA genom att agera som en distribuerad, beständig händelsestråle som kopplar bort producenter från konsumenter.

Översikt över AWS Kinesis

AWS erbjuder flera Kinesis-tjänster, var och en utformad för specifika användningsområden. När du utvärderar strömningslösningar kan du också vilja överväga att jämföra RabbitMQ på EKS mot SQS för olika meddelandemönster och kostnadsimplikationer.

Kinesis Data Streams

Den kärnströmtjänst som fångar, lagrar och bearbetar dataposter i realtid. Data Streams är idealisk för:

  • Anpassade realtidsbehandlingsapplikationer
  • Att bygga datapipelines med subsekunders latens
  • Bearbetning av miljoner händelser per sekund
  • Implementering av händelsekällmönster

Kinesis Data Firehose

En fullt hanterad tjänst som levererar strömmande data till destinationer som S3, Redshift, Elasticsearch eller HTTP-ändpunkter. Bäst för:

  • Enkla ETL-pipelines
  • Loggaggregering och arkivering
  • Nära realtidsanalys (60-sekunders minimilatens)
  • Scenarier där du inte behöver anpassad bearbetningslogik

Kinesis Data Analytics

Behandlar och analyserar strömmande data med SQL eller Apache Flink. Användningsområden inkluderar:

  • Realtidsinstrumentpaneler
  • Strömmande ETL
  • Realtidsanomalidetektering
  • Kontinuerlig metrikgenerering

Arkitekturmönster med Kinesis

1. Händelsekällmönster

Händelsekällan lagrar alla förändringar i applikationsstatus som en sekvens av händelser. Kinesis är perfekt för detta. Om du behöver en påminnelse om Python grunderna, kolla vårt Python Cheatsheet:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publicera en händelse till Kinesis-stråle"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }

    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )

    return response['SequenceNumber']

# Exempel: Användarregistreringsevent
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Separera läs- och skrivoperationer med Kinesis som händelsebuss:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }

    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })

    return err
}

3. Fan-Out-mönster med Lambda

Behandla händelser från en enda stråle med flera Lambda-funktioner. För TypeScript-implementeringar med starkare typkontroll, referera till vårt TypeScript Cheatsheet:

// Lambda-konsument för e-postmeddelanden
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );

        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// En annan Lambda för lagervärderingar
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );

        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Bästa praxis för produktion

1. Att välja rätt antal shards

Beräkna dina shard-behov baserat på:

  • Ingress: 1 MB/sec eller 1,000 poster/sec per shard
  • Egress: 2 MB/sec per shard (standardkonsumenter) eller 2 MB/sec per konsument med förbättrad fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Beräkna krävt antal shards"""
    # Ingresskapacitet
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )

    return int(ingress_shards) + 1  # Lägg till buffert

2. Implementera rätt felhantering

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
                          max_retries=3):
    """Put record med exponentiell backoff-upprepning"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponentiell backoff
                    continue
            raise

3. Använd förbättrad fan-out för flera konsumenter

Förbättrad fan-out tillhandahåller dedikerad genomströmning för varje konsument:

# Registrera en konsument med förbättrad fan-out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Övervaka viktiga mätvärden

Viktiga CloudWatch-mätvärden att följa:

  • IncomingRecords: Antal poster som lyckades läggas till
  • IncomingBytes: Bytevolym av inkommande data
  • GetRecords.IteratorAgeMilliseconds: Hur långt konsumenterna ligger efter
  • WriteProvisionedThroughputExceeded: Tröttningshändelser
  • ReadProvisionedThroughputExceeded: Konsumenttröttning

5. Implementera rätt partitioneringsnyckelstrategi

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generera partitioneringsnyckel med jämn fördelning"""
    # Använd konsistent hashing för jämn fördelning
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Exempel på verklig implementering

Här är ett komplett exempel på en orderbehandlingsmikrotjänstarkitektur:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name

    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Skapa order och publicera händelser"""
        order_id = self.generate_order_id()

        # Publicera order skapad händelse
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)

        return order_id

    def publish_event(self, event_type: str, payload: Dict,
                     partition_key: str):
        """Publicera händelse till Kinesis-stråle"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }

        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Konsumerar orderhändelser och uppdaterar lager"""

    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])

            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])

    def reserve_inventory(self, order_data):
        # Uppdatera lagerdatabas
        for item in order_data['items']:
            # Implementering här
            pass

Migrationsstrategi från monolit till mikrotjänster

Fas 1: Strangler Fig-mönster

Börja med att dirigera specifika händelser genom Kinesis medan du behåller monolitet:

  1. Identifiera begränsade kontexter i ditt monolit
  2. Skapa Kinesis-strålar för händelser mellan kontexter
  3. Gradvis extrahera tjänster som konsumerar från dessa strålar
  4. Behåll bakåtkompatibilitet med monolitet

Fas 2: Parallell bearbetning

Kör både gamla och nya system parallellt:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Skriv till både gammalt system och händelsestråle"""
    try:
        # Skriv till nytt system först
        publish_to_kinesis(kinesis_stream, data)

        # Sedan uppdatera gammalt system
        legacy_db.update(data)
    except Exception as e:
        # Implementera kompensationslogik
        rollback_kinesis_event(kinesis_stream, data)
        raise

Fas 3: Fullständig migration

När tillförlitligheten har etablerats, dirigera all trafik genom händelsestyda arkitekturen.

Kostnadsoptimeringstrategier

1. Använd On-Demand-läge för variabla arbetsbelastningar

On-demand-läge (introducerat 2023) skalar automatiskt baserat på trafik:

# Skapa ström med on-demand-läge
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementera datagruppering

Minska PUT-payloadenheter genom att batcha poster:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Gruppera poster för att minska kostnader"""
    aggregator = RecordAggregator()

    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )

    # Skicka grupperad post
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimera databehållning

Standardbehållningstid är 24 timmar. Förläng endast om nödvändigt:

# Ställ in behållningstid till 7 dagar
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Säkerhetsbästa praxis

1. Kryptering i vila och i transit

# Skapa krypterad ström
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Aktivera kryptering
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. IAM-policys för minst privilegium

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC-anslutningar

Håll trafik inom AWS-nätverket. För hantering av AWS-infrastruktur som kod, överväg att använda Terraform - se vår Terraform cheatsheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabilitet och felsökning

Distribuerad spårning med X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])

    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights-förfrågningar

-- Hitta långsamma bearbetningstider
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Följ felaktigheter
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Avancerade mönster

Saga-mönster för distribuerade transaktioner

Implementera långvariga transaktioner över mikrotjänster:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())

    def execute(self, order_data):
        """Kör saga med kompensationslogik"""
        try:
            # Steg 1: Reservera lager
            self.publish_command('RESERVE_INVENTORY', order_data)

            # Steg 2: Bearbeta betalning
            self.publish_command('PROCESS_PAYMENT', order_data)

            # Steg 3: Skicka order
            self.publish_command('SHIP_ORDER', order_data)

        except SagaException as e:
            # Kompensera i omvänd ordning
            self.compensate(e.failed_step)

    def compensate(self, failed_step):
        """Kör kompensationstransaktioner"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }

        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Teststrategier

Lokal utveckling med LocalStack

# Starta LocalStack med Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Skapa testström
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Integrationstester

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Testa eventpublicering med mockad Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)

    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])

    assert order_id is not None

Prestandaoptimering

Optimera batchstorlek

def optimize_batch_processing(records, batch_size=500):
    """Bearbeta poster i optimerade batchar"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Använd anslutningspooling

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Användbara länkar

AWS Kinesis-resurser:

Relaterade artiklar:

Slutsats

AWS Kinesis ger en robust grund för att bygga skalbara, händelsedrivna mikrotjänstarkitekturer. Genom att följa dessa mönster och bästa praxis kan du skapa system som är robusta, skalbara och underhållbara. Börja med en enda eventström, validera din arkitektur och utvidga gradvis till mer komplexa mönster när ditt system växer.

Nyckeln till framgång är att förstå dina dataflödeskrav, välja rätt Kinesis-tjänst för ditt användningsområde och implementera rätt övervakning och felhantering från början.