Het bouwen van gebeurtenisgestuurde microservices met AWS Kinesis

Gebeurtenisgestuurde architectuur met AWS Kinesis voor schaalbaarheid

Inhoud

AWS Kinesis is geworden een kernstuk bij het bouwen van moderne, gebeurtenisgestuurde microservicesarchitecturen, waarmee real-time dataverwerking op schaal mogelijk is met minimale operationele overhead.

amazon-kinesis

Begrijpen van gebeurtenisgestuurde microservicesarchitectuur

Een gebeurtenisgestuurde architectuur (EDA) is een ontwerppatroon waarbij diensten communiceren via gebeurtenissen in plaats van directe synchrone oproepen. Dit aanpak biedt verschillende voordelen:

  • Loskoppeling: Diensten hoeven niet te weten van elkaars bestaan
  • Schaalbaarheid: Elke dienst schaalt onafhankelijk op basis van zijn werkbelasting
  • Resilientie: Fouten in één dienst verspreiden zich niet naar andere diensten
  • Flexibiliteit: Nieuwe diensten kunnen worden toegevoegd zonder bestaande diensten te wijzigen

AWS Kinesis biedt de onderbouwing voor het implementeren van EDA door te fungeren als een gedistribueerde, duurzame gebeurtenisstroom die producenten van consumptieerscheidt.

Overzicht van AWS Kinesis

AWS biedt verschillende Kinesis-diensten, elk ontworpen voor specifieke toepassingen. Bij het beoordelen van streamingoplossingen, zou je ook willen overwegen RabbitMQ op EKS vs SQS vergelijken voor verschillende berichtpatronen en kostimplicaties.

Kinesis Data Streams

Het kernstreamingsdienst dat gegevensrecords opslaat en verwerkt in real-time. Data Streams is ideaal voor:

  • Aangepaste real-time verwerkingsapplicaties
  • Bouwen van datapijplijnen met subseconde latentie
  • Verwerken van miljoenen gebeurtenissen per seconde
  • Implementatie van gebeurtenisregistratiepatronen

Kinesis Data Firehose

Een volledig beheerde dienst die streamdata levert naar bestemmingen zoals S3, Redshift, Elasticsearch of HTTP-eindpunten. Beste voor:

  • Eenvoudige ETL-pijplijnen
  • Logaggregatie en archivering
  • Bijna real-time analyse (minimale latentie van 60 seconden)
  • Scenario’s waarin je geen aangepaste verwerkingslogica nodig hebt

Kinesis Data Analytics

Verwerkt en analyseert streamdata met behulp van SQL of Apache Flink. Toepassingen omvatten:

  • Real-time dashboards
  • Streaming ETL
  • Real-time anomalie detectie
  • Continue metriekgeneratie

Architectuurpatronen met Kinesis

1. Gebeurtenisregistratiepatroon

Gebeurtenisregistratie slaat alle veranderingen in de toepassingsstatus op als een reeks gebeurtenissen. Kinesis is perfect voor dit. Als je een herverversing van Python-fundamenten nodig hebt, bekijk dan onze Python Cheat Sheet:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Gebeurtenis publiceren naar Kinesis stream"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Voorbeeld: Gebruikersregistratiegebeurtenis
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Scheid lees- en schrijfopties met Kinesis als gebeurtenisbus:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Fan-Outpatroon met Lambda

Verwerk gebeurtenissen van één stream met meerdere Lambda-functies. Voor TypeScript-implementaties met sterke typesicherheid, verwijzen naar onze TypeScript Cheat Sheet:

// Lambda-consumer voor e-mailmeldingen
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Een andere Lambda voor voorraadupdates
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Beste praktijken voor productie

1. Kiezen van het juiste aantal shards

Bereken je shardvereisten op basis van:

  • Invoer: 1 MB/sec of 1.000 records/sec per shard
  • Uitvoer: 2 MB/sec per shard (standaard consumptie) of 2 MB/sec per consumptie met uitgebreid fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Bereken het vereiste aantal shards"""
    # Invoercapaciteit
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Voeg buffer toe

2. Implementeer juiste foutafhandeling

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Put record met exponentiële backoff retry"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponentiële backoff
                    continue
            raise

3. Gebruik uitgebreid fan-out voor meerdere consumptieers

Uitgebreid fan-out biedt toegewezen doorstroming voor elke consumptieer:

# Registreer een consumptieer met uitgebreid fan-out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Monitor belangrijke metrieken

Essentiële CloudWatch-metrieken om te volgen:

  • IncomingRecords: Aantal records succesvol ingevoegd
  • IncomingBytes: Bytevolume van ingekomen data
  • GetRecords.IteratorAgeMilliseconds: Hoe ver consumptieers achter zijn
  • WriteProvisionedThroughputExceeded: Throttlinggebeurtenissen
  • ReadProvisionedThroughputExceeded: Consumptieerthrottling

5. Implementeer juiste partitiekeystrategie

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Genereer partitiekey met gelijke verdeling"""
    # Gebruik consistente hashing voor gelijke verdeling
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Voorbeeld van real-world implementatie

Hier is een volledig voorbeeld van een orderverwerkingsmicroservicesarchitectuur:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Order aanmaken en gebeurtenissen publiceren"""
        order_id = self.generate_order_id()
        
        # Gebeurtenis voor order aangemaakt publiceren
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Gebeurtenis publiceren naar Kinesis stream"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Gebruikt ordergebeurtenissen en bijwerkt voorraad"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Voorraaddatabase bijwerken
        for item in order_data['items']:
            # Implementatie hier
            pass

Migratiestrategie van monolith naar microservices

Fase 1: Strangler Fig Patroon

Begin met het routeren van specifieke gebeurtenissen via Kinesis terwijl de monolith behouden blijft:

  1. Identificeer begrenste contexten in je monolith
  2. Maak Kinesisstreams voor cross-context gebeurtenissen
  3. Trek geleidelijk diensten af die van deze streams consumeren
  4. Behoud achterwaartse compatibiliteit met de monolith

Fase 2: Parallel verwerking

Zowel oude als nieuwe systemen uitvoeren parallel:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Schrijf naar zowel legacy systeem als eventstream"""
    try:
        # Eerst schrijven naar nieuw systeem
        publish_to_kinesis(kinesis_stream, data)
        
        # Dan legacy systeem bijwerken
        legacy_db.update(data)
    except Exception as e:
        # Compensatie logica implementeren
        rollback_kinesis_event(kinesis_stream, data)
        raise

Fase 3: Volledige migratie

Zodra vertrouwen is opgebouwd, routeer alle verkeer via de gebeurtenisgestuurde architectuur.

Kostoptimalisatiestrategieën

1. Gebruik On-Demand-modus voor variabele werkbelastingen

On-demand-modus (geïntroduceerd in 2023) schaalt automatisch op basis van verkeer:

# Stream aanmaken met on-demand modus
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementeer dataaggregatie

Verlaag PUT payload units door records te batchen:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Aggregateer records om kosten te verminderen"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Aggegregeerde record verzenden
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimaliseer dataopslag

Standaard opslagduur is 24 uur. Pas deze alleen aan als nodig:

# Opslagduur instellen op 7 dagen
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Beveiligingsbeste praktijken

1. Versleuteling opslag en in transitie

# Versleutelde stream aanmaken
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Versleuteling inschakelen
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. IAM-beleid voor minimale bevoegdheid

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC-eindpunten

Houd verkeer binnen het AWS-netwerk. Voor het beheren van AWS-infrastructuur als code, overweeg het gebruik van Terraform - zie onze Terraform cheat sheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabiliteit en foutopsporing

Gedistribueerde traceren met X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights Queries

-- Langzaam verwerkingsduur vinden
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Foutpercentages volgen
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Geavanceerde patronen

Saga Patroon voor gedistribueerde transacties

Implementeer lange transacties over microservices:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Saga uitvoeren met compensatie logica"""
        try:
            # Stap 1: Voorraad reserveren
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Stap 2: Betaling verwerken
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Stap 3: Order verzenden
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Compensatie in omgekeerde volgorde
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Compensatie transacties uitvoeren"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Teststrategieën

Lokale ontwikkeling met LocalStack

# LocalStack starten met Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Teststream aanmaken
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Integratietesten

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Test event publishing met gemockte Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Prestatieoptimalisatie

Optimaliseer batchgrootte

def optimize_batch_processing(records, batch_size=500):
    """Batchverwerking optimaliseren"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Gebruik verbindingspoolen

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

AWS Kinesis bronnen:

Gerelateerde artikelen:

Conclusie

AWS Kinesis biedt een robuuste basis voor het bouwen van schaalbare, gebeurtenisgestuurde microservicesarchitecturen. Door deze patronen en beste praktijken te volgen, kun je systemen creëren die robuust, schaalbaar en onderhoudbaar zijn. Begin klein met één gebeurtenisstroom, valideer je architectuur en breid geleidelijk uit naar complexere patronen naarmate je systeem groeit.

Het sleutel tot succes is het begrijpen van je dataflowvereisten, het kiezen van de juiste Kinesisdienst voor je toepassing en het implementeren van juiste monitoring en foutafhandeling vanaf het begin.