Construire des microservices événementiels avec AWS Kinesis

Architecture événementielle avec AWS Kinesis pour une mise à l'échelle

Sommaire

AWS Kinesis est devenu un pilier pour la construction d’architectures modernes basées sur les microservices événementiels, permettant un traitement en temps réel des données à grande échelle avec un minimum de surcoût opérationnel.

amazon-kinesis

Comprendre l’architecture microservices événementielle

L’architecture événementielle (EDA) est un modèle de conception où les services communiquent via des événements plutôt que par des appels synchrones directs. Cette approche offre plusieurs avantages :

  • Découplage lâche : Les services n’ont pas besoin de connaître l’existence des autres
  • Évolutivité : Chaque service s’adapte indépendamment en fonction de sa charge de travail
  • Résilience : Les défaillances dans un service ne se propagent pas aux autres
  • Flexibilité : De nouveaux services peuvent être ajoutés sans modifier les existants

AWS Kinesis fournit le socle pour la mise en œuvre de l’EDA en agissant comme un flux d’événements distribué et durable qui découple les producteurs des consommateurs.

Aperçu d’AWS Kinesis

AWS propose plusieurs services Kinesis, chacun conçu pour des cas d’utilisation spécifiques. Lors de l’évaluation des solutions de streaming, vous pourriez également souhaiter comparer RabbitMQ sur EKS vs SQS pour différents modèles de messagerie et implications de coût.

Kinesis Data Streams

Le service de streaming principal qui capture, stocke et traite des enregistrements de données en temps réel. Data Streams est idéal pour :

  • Des applications de traitement personnalisé en temps réel
  • La création de pipelines de données avec une latence sous la seconde
  • Le traitement de millions d’événements par seconde
  • L’implémentation de modèles de sourcing d’événements

Kinesis Data Firehose

Un service géré qui livre les données en streaming vers des destinations comme S3, Redshift, Elasticsearch ou des points de terminaison HTTP. Idéal pour :

  • Des pipelines ETL simples
  • L’agrégation et l’archivage des journaux
  • L’analyse quasi en temps réel (latence minimale de 60 secondes)
  • Les scénarios où vous n’avez pas besoin de logique de traitement personnalisée

Kinesis Data Analytics

Traite et analyse les données en streaming à l’aide de SQL ou d’Apache Flink. Les cas d’utilisation incluent :

  • Des tableaux de bord en temps réel
  • Des ETL en streaming
  • La détection d’anomalies en temps réel
  • La génération continue de métriques

Modèles architecturaux avec Kinesis

1. Modèle de sourcing d’événements

Le sourcing d’événements stocke toutes les modifications de l’état de l’application sous forme de séquence d’événements. Kinesis est parfait pour cela. Si vous avez besoin d’un rappel sur les fondamentaux de Python, consultez notre fiche de révision Python :

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publier un événement sur le flux Kinesis"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Exemple : événement d'inscription utilisateur
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Séparez les opérations de lecture et d’écriture en utilisant Kinesis comme bus d’événements :

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Modèle de diffusion en fan-out avec Lambda

Traitez des événements d’un seul flux avec plusieurs fonctions Lambda. Pour les implémentations TypeScript avec une sécurité de type plus forte, consultez notre fiche de révision TypeScript :

// Consommateur Lambda pour les notifications par e-mail
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Un autre Lambda pour les mises à jour d'inventaire
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Bonnes pratiques pour la production

1. Choisir le bon nombre de shards

Calculez vos besoins en shards en fonction de :

  • Entrée : 1 Mo/sec ou 1 000 enregistrements/sec par shard
  • Sortie : 2 Mo/sec par shard (consommateurs standards) ou 2 Mo/sec par consommateur avec diffusion améliorée
def calculate_shards(records_per_second, avg_record_size_kb):
    """Calculer le nombre requis de shards"""
    # Capacité d'entrée
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Ajouter un buffer

2. Implémenter une gestion correcte des erreurs

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Insérer un enregistrement avec un réessai à recul exponentiel"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Recul exponentiel
                    continue
            raise

3. Utiliser la diffusion améliorée pour plusieurs consommateurs

La diffusion améliorée fournit une bande passante dédiée pour chaque consommateur :

# Enregistrer un consommateur avec diffusion améliorée
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Surveiller les métriques clés

Métriques essentielles de CloudWatch à suivre :

  • IncomingRecords : Nombre d’enregistrements insérés avec succès
  • IncomingBytes : Volume en octets des données entrantes
  • GetRecords.IteratorAgeMilliseconds : Combien les consommateurs sont en retard
  • WriteProvisionedThroughputExceeded : Événements de limitation
  • ReadProvisionedThroughputExceeded : Limitation des consommateurs

5. Implémenter une stratégie correcte de clé de partition

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Générer une clé de partition avec une distribution équilibrée"""
    # Utiliser un hachage cohérent pour une distribution équilibrée
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Exemple d’implémentation en pratique

Voici un exemple complet d’une architecture de microservices de traitement des commandes :

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Créer une commande et publier des événements"""
        order_id = self.generate_order_id()
        
        # Publier l'événement de création de commande
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publier un événement sur le flux Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Consomme les événements de commande et met à jour l'inventaire"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Mettre à jour la base de données d'inventaire
        for item in order_data['items']:
            # Implémentation ici
            pass

Stratégie de migration d’un monolithe vers des microservices

Phase 1 : Modèle de figue strangler

Commencez par diriger certains événements via Kinesis tout en conservant le monolithe :

  1. Identifier les contextes bornés dans votre monolithe
  2. Créer des flux Kinesis pour les événements trans-contextuels
  3. Extraire progressivement les services qui consomment ces flux
  4. Maintenir la compatibilité descendante avec le monolithe

Phase 2 : Traitement parallèle

Exécuter à la fois les anciens et nouveaux systèmes en parallèle :

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Écrire dans le système ancien et dans le flux d'événements"""
    try:
        # Écrire dans le nouveau système d'abord
        publish_to_kinesis(kinesis_stream, data)
        
        # Puis mettre à jour le système ancien
        legacy_db.update(data)
    except Exception as e:
        # Implémenter la logique de compensation
        rollback_kinesis_event(kinesis_stream, data)
        raise

Phase 3 : Migration complète

Une fois la confiance établie, dirigez tout le trafic via l’architecture événementielle.

Stratégies d’optimisation des coûts

1. Utiliser le mode à la demande pour les charges de travail variables

Le mode à la demande (introduit en 2023) s’adapte automatiquement au trafic :

# Créer un flux avec le mode à la demande
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implémenter l’agrégation de données

Réduisez les unités de PUT en regroupant les enregistrements :

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Agrégation d'enregistrements pour réduire les coûts"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Envoyer l'enregistrement agrégé
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimiser la rétention des données

La rétention par défaut est de 24 heures. N’étendez-la que si nécessaire :

# Définir la rétention à 7 jours
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Bonnes pratiques de sécurité

1. Chiffrement au repos et en transit

# Créer un flux chiffré
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Activer le chiffrement
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Politiques IAM pour le privilège minimal

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. Points de terminaison VPC

Gardez le trafic à l’intérieur du réseau AWS. Pour gérer l’infrastructure AWS en tant que code, envisagez d’utiliser Terraform - consultez notre fiche de révision Terraform :

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabilité et débogage

Suivi distribué avec X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Requêtes CloudWatch Logs Insights

-- Trouver les temps de traitement lents
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Suivre les taux d'erreur
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Modèles avancés

Modèle Saga pour les transactions distribuées

Implémenter des transactions longues durant les microservices :

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Exécuter le saga avec la logique de compensation"""
        try:
            # Étape 1 : Réserver l'inventaire
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Étape 2 : Traiter le paiement
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Étape 3 : Expédier la commande
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Compenser dans l'ordre inverse
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Exécuter les transactions de compensation"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Stratégies de test

Développement local avec LocalStack

# Démarrer LocalStack avec Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Créer un flux de test
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Tests d’intégration

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Tester la publication d'événements avec Kinesis simulé"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Optimisation des performances

Optimiser la taille des lots

def optimize_batch_processing(records, batch_size=500):
    """Traiter les enregistrements en lots optimisés"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Utiliser le regroupement de connexions

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Liens utiles

Ressources AWS Kinesis :

Articles liés :

Conclusion

AWS Kinesis fournit une base solide pour construire des architectures de microservices événementielles évolutives. En suivant ces modèles et bonnes pratiques, vous pouvez créer des systèmes résilients, évolutifs et maintenables. Commencez petit avec un seul flux d’événements, validez votre architecture, et étendez progressivement vers des modèles plus complexes à mesure que votre système grandit.

La clé du succès est de comprendre vos besoins en flux de données, de choisir le bon service Kinesis pour votre cas d’utilisation et d’implémenter une surveillance et une gestion des erreurs correctes dès le départ.