Att bygga händelsestyrdas mikrotjänster med AWS Kinesis

Händelsestyrd arkitektur med AWS Kinesis för skalbarhet

Sidinnehåll

AWS Kinesis har blivit en hörnsten för att bygga moderna händelsestyrda arkitekturer för mikrotjänster, vilket möjliggör realtidsbehandling av data i stor skala med minimal driftsöverkostnad.

amazon-kinesis

Förståelse av händelsestyrda mikrotjänstarkitekturer

Händelsestyrd arkitektur (EDA) är ett designmönster där tjänster kommunicerar genom händelser snarare än direkta synkrona anrop. Detta tillvägagångssätt erbjuder flera fördelar:

  • Löst koppling: Tjänster behöver inte känna till varandras existens
  • Skalerbarhet: Varje tjänst skalar oberoende baserat på sin arbetsbelastning
  • Resiliens: Fel i en tjänst sprider sig inte till andra
  • Flexibilitet: Nya tjänster kan läggas till utan att modifiera befintliga

AWS Kinesis utgör ryggraden för att implementera EDA genom att fungera som en distribuerad och bestående händelseström som kopplar bort producenter från konsumenter.

För en bredare bild av strömmande plattformar, se vår Apache Kafka Quickstart-guide för jämförelse med självhysta alternativ.

Översikt över AWS Kinesis

AWS erbjuder flera Kinesis-tjänster, var och en designad för specifika användningsfall. När du utvärderar strömlösningar kan det också vara värt att överväga att jämföra RabbitMQ på EKS med SQS för olika meddelandemönster och kostnadsimplikationer.

Kinesis Data Streams

Den centrala strömmande tjänsten som fångar, lagrar och bearbetar dataregistreringar i realtid. Data Streams är idealiskt för:

  • Anpassade applikationer för realtidsbehandling
  • Att bygga datapipelines med latens under en sekund
  • Att bearbeta miljoner händelser per sekund
  • Att implementera händelselagringsmönster (event sourcing)

Kinesis Data Firehose

En helt hanterad tjänst som levererar strömmande data till destinationer som S3, Redshift, Elasticsearch eller HTTP-ändpunkter. Bäst för:

  • Enkla ETL-pipelines
  • Samling och arkivering av loggar
  • Nästan realtidsanalys (minst 60 sekunders latens)
  • Scenarier där du inte behöver anpassad bearbetningslogik

Kinesis Data Analytics

Bearbetar och analyserar strömmande data med hjälp av SQL eller Apache Flink. Användningsfall inkluderar:

  • Realtidsdashboard
  • Strömmande ETL
  • Realtidsdetektering av avvikelser
  • Kontinuerlig generering av metrik

För en djupare förklaring av Flink-operationer, se vår Apache Flink på K8s och Kafka-guide.

Arkitekturmönster med Kinesis

1. Händelselagringsmönster (Event Sourcing)

Händelselagring sparar alla ändringar i tillståndet för en applikation som en sekvens av händelser. Kinesis är perfekt för detta. Om du behöver en uppdatering av Python-grunder, kolla in vår Python-snabbreferens:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publicera en händelse till Kinesis-ström"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Exempel: Händelse för användarregistrering
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Separera läs- och skrivoperationer med Kinesis som händelsebuss:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Fan-out-mönster med Lambda

Bearbeta händelser från en enda ström med flera Lambda-funktioner. För TypeScript-implementeringar med starkare typsäkerhet, se vår TypeScript-snabbreferens:

// Lambda-konsument för e-postmeddelanden
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// En annan Lambda för lageruppdateringar
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Bästa praxis för produktion

1. Välja rätt antal shards

Beräkna dina behov av shards baserat på:

  • Inkomst: 1 MB/sek eller 1 000 register/sek per shard
  • Utgång: 2 MB/sek per shard (standardkonsumenter) eller 2 MB/sek per konsument med förbättrad fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Beräkna erforderligt antal shards"""
    # Inkomstkapacitet
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Lägg till buffert

2. Implementera korrekt felhantering

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Sätt register med exponentiell backoff-återförsök"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponentiell backoff
                    continue
            raise

3. Använd förbättrad fan-out för flera konsumenter

Förbättrad fan-out ger dedikerad genomströmning för varje konsument:

# Registrera en konsument med förbättrad fan-out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Övervaka nyckelmetriker

Viktiga CloudWatch-metriker att spåra:

  • IncomingRecords: Antal register som lyckats sättas
  • IncomingBytes: Bytevolym av inkommande data
  • GetRecords.IteratorAgeMilliseconds: Hur långt efter konsumenterna är
  • WriteProvisionedThroughputExceeded: Throttling-händelser
  • ReadProvisionedThroughputExceeded: Throttling av konsumenter

5. Implementera en korrekt strategi för partitionsnycklar

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generera partitionsnyckel med jämn fördelning"""
    # Använd konsistent hashing för jämn fördelning
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Implementeringsexempel från verkliga scenarier

Här är ett komplett exempel på en mikrotjänstarkitektur för beställningshantering:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Skapa beställning och publicera händelser"""
        order_id = self.generate_order_id()
        
        # Publicera händelse för skapad beställning
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publicera händelse till Kinesis-ström"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Konsumerar beställningshändelser och uppdaterar lager"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Uppdatera lagersdatabas
        for item in order_data['items']:
            # Implementering här
            pass

Migrationsstrategi från monolit till mikrotjänster

Fas 1: Strangler Fig-mönster

Börja med att dirigera specifika händelser genom Kinesis medan du behåller monoliten:

  1. Identifiera begränsade kontexter i din monolit
  2. Skapa Kinesis-strömmar för händelser som sträcker sig över kontexter
  3. Gradvis extrahera tjänster som konsumerar från dessa strömmar
  4. Upprätthåll bakåtkompatibilitet med monoliten

Fas 2: Parallell bearbetning

Kör både gamla och nya system parallellt:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Skriv till både legacy-system och händelseström"""
    try:
        # Skriv till nya system först
        publish_to_kinesis(kinesis_stream, data)
        
        # Uppdatera sedan legacy-systemet
        legacy_db.update(data)
    except Exception as e:
        # Implementera kompenseringslogik
        rollback_kinesis_event(kinesis_stream, data)
        raise

Fas 3: Full migration

När tillit etablerats, dirigera all trafik genom den händelsestyrda arkitekturen.

Strategier för kostnoptimering

För omfattande vägledning om datainfrastrukturmönster inklusive objektlagring och databasarkitekturer, se Datainfrastruktur för AI-system: Objektlagring, databaser, sökning & AI-dataarkitektur.

1. Använd On-Demand-läge för varierande arbetsbelastningar

On-Demand-läge (infört 2023) skalar automatiskt baserat på trafik:

# Skapa ström med On-Demand-läge
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementera dataaggregering

Minska PUT-betalenheter genom att sätta ihop register i batch:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Aggregera register för att minska kostnader"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Skicka aggregerat register
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimera databevarande

Standardbevarande är 24 timmar. Förläng det bara om det är nödvändigt:

# Sätt bevarande till 7 dagar
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Säkerhetsbästa praxis

1. Kryptering i viloläge och under överföring

# Skapa krypterad ström
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Aktivera kryptering
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. IAM-policy för minsta privilegier

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC-ändpunkter

Håll trafiken inom AWS-nätverket. För att hantera AWS-infrastruktur som kod, överväg att använda Terraform – se vår Terraform-snabbreferens:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabilitet och felsökning

Distribuerad spårning med X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights-frågor

-- Hitta långsam bearbetningstid
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Spåra felfrekvens
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Avancerade mönster

Saga-mönster för distribuerade transaktioner

Implementera långvariga transaktioner över mikrotjänster:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Kör saga med kompenseringslogik"""
        try:
            # Steg 1: Reservera lager
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Steg 2: Bearbeta betalning
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Steg 3: Skicka beställning
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Kompensera i omvänd ordning
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Kör kompenserande transaktioner"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Teststrategier

Lokal utveckling med LocalStack

# Starta LocalStack med Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Skapa testström
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Integrationstestning

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Testa händelsepublicering med mockad Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Prestandajusterning

Optimera batchstorlek

def optimize_batch_processing(records, batch_size=500):
    """Bearbeta register i optimerade batchar"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Använd anslutningspooling

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Användbara länkar

AWS Kinesis-resurser:

Relaterade artiklar:

Slutsats

AWS Kinesis ger en robust grund för att bygga skalbara, händelsestyrda mikrotjänstarkitekturer. Genom att följa dessa mönster och bästa praxis kan du skapa system som är resilienta, skalbara och underhållsbara. Börja smått med en enda händelseström, validera din arkitektur och gradvis expandera till mer komplexa mönster när ditt system växer.

Nyckeln till framgång är att förstå dina krav på dataflöde, välja rätt Kinesis-tjänst för ditt användningsfall och implementera korrekt övervakning och felhantering från start.