Het opbouwen van gebeurtenisgestuurde microservices met AWS Kinesis

Event-driven architectuur met AWS Kinesis voor schaalbaarheid

Inhoud

AWS Kinesis is een hoeksteen geworden voor het opbouwen van moderne, gebeurtenisgestuurde microservices-architecturen en stelt schaalbare real-time verwerking van gegevens mogelijk met minimale operationele overhead.

amazon-kinesis

Begrip van Gebeurtenisgestuurde Microservices-architectuur

Gebeurtenisgestuurde architectuur (EDA) is een ontwerppatroon waarbij services communiceren via gebeurtenissen in plaats van directe synchrone aanroepen. Deze aanpak biedt verschillende voordelen:

  • Losse koppeling: Services hoeven niet op de hoogte te zijn van het bestaan van elkaar
  • Schaalbaarheid: Elke service schaalt onafhankelijk op basis van zijn belasting
  • Resilientie: Mislukkingen in één service verspreiden zich niet naar anderen
  • Flexibiliteit: Nieuwe services kunnen worden toegevoegd zonder bestaande services aan te passen

AWS Kinesis vormt de ruggengraat voor het implementeren van EDA door te fungeren als een gedistribueerde, duurzame gebeurtenisstroom die producenten en consumenten van elkaar decoupleert.

Voor een bredere kijk op streamingplatforms, bekijk onze Apache Kafka Quickstart-handleiding voor een vergelijking met zelfgehoste alternatieven.

Overzicht van AWS Kinesis

AWS biedt verschillende Kinesis-services, elk ontworpen voor specifieke gebruiksscenario’s. Bij het evalueren van streamingoplossingen wilt u misschien ook overwegen RabbitMQ op EKS te vergelijken met SQS voor verschillende berichtenpatroon en kostenimplicaties.

Kinesis Data Streams

De kernstreamingservice die gegevensrecords in real-time vastlegt, opslaat en verwerkt. Data Streams is ideaal voor:

  • Aangepaste real-time verwerkingsapplicaties
  • Het bouwen van gegevenspijplijnen met een latentie van minder dan een seconde
  • Het verwerken van miljoenen gebeurtenissen per seconde
  • Het implementeren van event-sourcing patronen

Kinesis Data Firehose

Een volledig beheerde service die streaminggegevens levert naar bestemmingen zoals S3, Redshift, Elasticsearch of HTTP-eindpunten. Best voor:

  • Eenvoudige ETL-pijplijnen
  • Log-aggregatie en archivering
  • Bijna real-time analyse (minimale latentie van 60 seconden)
  • Scenario’s waarbij u geen aangepaste verwerkingslogica nodig heeft

Kinesis Data Analytics

Verwerkt en analyseert streaminggegevens met behulp van SQL of Apache Flink. Gebruiksscenario’s omvatten:

  • Real-time dashboards
  • Streaming ETL
  • Real-time anomaliedetectie
  • Continue metriekegeneratie

Voor een dieper duik in Flink-bewerkingen, bekijk onze Apache Flink op K8s en Kafka-handleiding.

Architectonische Patronen met Kinesis

1. Event Sourcing Patroon

Event sourcing slaat alle wijzigingen in de applicatiestatus op als een reeks gebeurtenissen. Kinesis is perfect hiervoor. Als u een opfrissing van de Python-fundamenten nodig heeft, bekijk dan onze Python Cheatsheet:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publiceer een gebeurtenis naar de Kinesis-stroom"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Voorbeeld: Gebruikersregistratiegebeurtenis
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Scheid lees- en schrijfbewerkingen met Kinesis als de gebeurtenisbus:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Fan-Out Patroon met Lambda

Verwerk gebeurtenissen van een enkele stroom met meerdere Lambda-functies. Voor TypeScript-implementaties met sterkere typesafeheid, verwijzen naar onze TypeScript Cheatsheet:

// Lambda-consument voor e-mailmeldingen
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Een andere Lambda voor voorraupdate
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Beste Praktijken voor Productie

1. Het Kiezen van het Juiste Aantal Shards

Bereken uw shard-behoeften op basis van:

  • Ingress: 1 MB/sec of 1.000 records/sec per shard
  • Egress: 2 MB/sec per shard (standaard consumenten) of 2 MB/sec per consument met versterkte fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Bereken het benodigde aantal shards"""
    # Ingress capaciteit
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Voeg buffer toe

2. Implementeer Correcte Foutafhandeling

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Record plaatsen met exponentiële backoff-herhaling"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponentiële backoff
                    continue
            raise

3. Gebruik Versterkte Fan-Out voor Meerdere Consumenten

Versterkte fan-out biedt toegewezen doorvoer voor elke consument:

# Registreer een consument met versterkte fan-out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Monitor Belangrijke Metrieken

Essentiële CloudWatch-metrieken om te volgen:

  • IncomingRecords: Aantal succesvol geplaatste records
  • IncomingBytes: Byte-volume van binnenkomende gegevens
  • GetRecords.IteratorAgeMilliseconds: Hoe ver consumenten achterlopend zijn
  • WriteProvisionedThroughputExceeded: Throttling-gebeurtenissen
  • ReadProvisionedThroughputExceeded: Consument throttling

5. Implementeer een Correcte Strategie voor Partitiel sleutels

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Genereer partitiesleutel met even distributie"""
    # Gebruik consistente hashing voor even distributie
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Implementatievoorbeeld in de Praktijk

Hier is een compleet voorbeeld van een bestelverwerkingsmicroservices-architectuur:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Maak bestelling en publiceer gebeurtenissen"""
        order_id = self.generate_order_id()
        
        # Publiceer 'bestelling aangemaakt' gebeurtenis
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publiceer gebeurtenis naar Kinesis-stroom"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Consumente bestelgebeurtenissen en update voorraad"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Update voorraaddatabase
        for item in order_data['items']:
            # Implementatie hier
            pass

Migrationsstrategie van Monolith naar Microservices

Fase 1: Strangler Fig Patroon

Begin met het routeren van specifieke gebeurtenissen via Kinesis terwijl u de monolith behoudt:

  1. Identificeer gebonden contexten in uw monolith
  2. Maak Kinesis-stromen aan voor gebeurtenissen tussen contexten
  3. Haal geleidelijk services die hieruit consumeren
  4. Behoud backward compatibility met de monolith

Fase 2: Parallelle Verwerking

Voer zowel het oude als het nieuwe systeem parallel uit:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Schrijf naar zowel het legacy-systeem als de gebeurtenisstroom"""
    try:
        # Schrijf eerst naar het nieuwe systeem
        publish_to_kinesis(kinesis_stream, data)
        
        # Vervolgens update het legacy-systeem
        legacy_db.update(data)
    except Exception as e:
        # Implementeer compensatielogica
        rollback_kinesis_event(kinesis_stream, data)
        raise

Fase 3: Volledige Migratie

Zodra het vertrouwen is opgebouwd, routeer alle verkeer via de gebeurtenisgestuurde architectuur.

Strategieën voor Kostenoptimalisatie

Voor uitgebreide begeleiding over data-infrastructuurpatroon, inclusief objectopslag en databasearchitecturen, verwijzen naar Data-infrastructuur voor AI-systemen: Objectopslag, Databases, Zoeken & AI-dataarchitectuur.

1. Gebruik On-Demand Mode voor Variabele Werklasten

On-demand mode (ingevoerd in 2023) schaalt automatisch op basis van verkeer:

# Maak stroom aan met on-demand mode
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementeer Gegevensaggregatie

Verlaag PUT-betaleenheden door records te batchen:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Aggregate records om kosten te verlagen"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Verstuur geaggregeerd record
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimaliseer Gegevensretentie

Standaard retentie is 24 uur. Verleng dit alleen indien nodig:

# Zet retentie op 7 dagen
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Veiligheidsbest Practices

1. Versleuteling op Rust en in Transit

# Maak versleutelde stroom aan
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Schakel versleuteling in
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. IAM-beleid voor Minimale Rechten

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC Endpoints

Houd verkeer binnen het AWS-netwerk. Voor het beheer van AWS-infrastructuur als code, overweeg het gebruik van Terraform - zie onze Terraform cheatsheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabiliteit en Debuggen

Gedistribueerd Tracing met X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights Queries

-- Vind trage verwerkingstijden
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Track foutpercentages
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Geavanceerde Patronen

Saga Patroon voor Gedistribueerde Transacties

Implementeer langlopende transacties over microservices:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Voer saga uit met compensatielogica"""
        try:
            # Stap 1: Reserveer voorraad
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Stap 2: Verwerk betaling
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Stap 3: Verstuur bestelling
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Compenseer in omgekeerde volgorde
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Voer compensatietransacties uit"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Teststrategieën

Lokale Ontwikkeling met LocalStack

# Start LocalStack met Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Maak teststroom aan
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Integratietesten

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Test gebeurtenispubliceren met gemockte Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Prestatie-optimalisatie

Optimaliseer Batchgrootte

def optimize_batch_processing(records, batch_size=500):
    """Verwerk records in geoptimaliseerde batches"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Gebruik Connection Pooling

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

AWS Kinesis-resources:

Gerelateerde artikelen:

Conclusie

AWS Kinesis biedt een robuuste basis voor het opbouwen van schaalbare, gebeurtenisgestuurde microservices-architecturen. Door deze patronen en beste praktijken te volgen, kunt u systemen creëren die resistent, schaalbaar en onderhoudbaar zijn. Begin klein met een enkele gebeurtenisstroom, valideer uw architectuur en breid geleidelijk uit naar complexere patronen naarmate uw systeem groeit.

De sleutel tot succes is het begrijpen van uw gegevensstroomvereisten, het kiezen van de juiste Kinesis-service voor uw gebruiksscenario en het implementeren van correcte monitoring en foutafhandeling vanaf het begin.