Création de microservices pilotés par les événements avec AWS Kinesis

Architecture orientée événements avec AWS Kinesis pour l'évolutivité

Sommaire

AWS Kinesis est devenu un pilier pour la construction d’architectures microservices modernes pilotées par événements, permettant le traitement de données en temps réel à grande échelle avec un surcoût opérationnel minimal.

amazon-kinesis

Compréhension de l’architecture microservices pilotée par événements

L’architecture pilotée par événements (EDA) est un modèle de conception où les services communiquent via des événements plutôt que par des appels synchrones directs. Cette approche offre plusieurs avantages :

  • Couplage lâche : Les services n’ont pas besoin de connaître l’existence des autres
  • Extensibilité : Chaque service évolue de manière indépendante en fonction de sa charge de travail
  • Résilience : Les défaillances d’un service ne se propagent pas aux autres
  • Flexibilité : De nouveaux services peuvent être ajoutés sans modifier les existants

AWS Kinesis fournit la base pour mettre en œuvre l’EDA en agissant comme un flux d’événements distribué et durable qui découple les producteurs des consommateurs.

Pour une perspective plus large sur les plateformes de streaming, consultez notre guide de démarrage rapide Apache Kafka pour comparer avec les alternatives auto-hébergées.

Aperçu d’AWS Kinesis

AWS propose plusieurs services Kinesis, chacun conçu pour des cas d’utilisation spécifiques. Lors de l’évaluation des solutions de streaming, vous pourriez également souhaiter comparer RabbitMQ sur EKS et SQS pour différents modèles de messagerie et implications de coûts.

Kinesis Data Streams

Le service de streaming principal qui capture, stocke et traite les enregistrements de données en temps réel. Data Streams est idéal pour :

  • Des applications de traitement en temps réel personnalisées
  • La construction de pipelines de données avec une latence inférieure à la seconde
  • Le traitement de millions d’événements par seconde
  • La mise en œuvre de modèles de sourcing d’événements

Kinesis Data Firehose

Un service entièrement géré qui livre des données en streaming vers des destinations comme S3, Redshift, Elasticsearch ou des points de terminaison HTTP. Idéal pour :

  • Des pipelines ETL simples
  • L’agrégation et l’archivage des journaux
  • L’analyse quasi temps réel (latence minimale de 60 secondes)
  • Les scénarios où aucune logique de traitement personnalisée n’est requise

Kinesis Data Analytics

Traite et analyse les données en streaming en utilisant SQL ou Apache Flink. Les cas d’utilisation incluent :

  • Tableaux de bord en temps réel
  • ETL en streaming
  • Détection d’anomalies en temps réel
  • Génération continue de métriques

Pour approfondir les opérations Flink, consultez notre guide Apache Flink sur K8s et Kafka.

Modèles architecturaux avec Kinesis

1. Modèle de Sourcing d’Événements

Le sourcing d’événements stocke tous les changements d’état de l’application comme une séquence d’événements. Kinesis est parfait pour cela. Si vous avez besoin d’un rappel sur les fondamentaux de Python, consultez notre Fiche de référence Python :

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publier un événement vers le flux Kinesis"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Exemple : événement d'inscription utilisateur
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Séparez les opérations de lecture et d’écriture en utilisant Kinesis comme bus d’événements :

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Modèle Fan-Out avec Lambda

Traitez les événements d’un seul flux avec plusieurs fonctions Lambda. Pour les implémentations TypeScript avec une sécurité de type renforcée, référez-vous à notre Fiche de référence TypeScript :

// Consommateur Lambda pour les notifications par e-mail
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Un autre Lambda pour les mises à jour de stock
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Meilleures pratiques pour la production

1. Choisir le bon nombre de shards

Calculez vos besoins en shards en fonction de :

  • Ingress : 1 Mo/sec ou 1 000 enregistrements/sec par shard
  • Egress : 2 Mo/sec par shard (consommateurs standards) ou 2 Mo/sec par consommateur avec fan-out amélioré
def calculate_shards(records_per_second, avg_record_size_kb):
    """Calculer le nombre requis de shards"""
    # Capacité d'ingress
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Ajouter une marge

2. Mettre en œuvre une gestion d’erreurs appropriée

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Mettre un enregistrement avec une stratégie de nouvelle tentative à backoff exponentiel"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Backoff exponentiel
                    continue
            raise

3. Utiliser le Fan-Out Amélioré pour plusieurs consommateurs

Le fan-out amélioré fournit un débit dédié pour chaque consommateur :

# Enregistrer un consommateur avec le fan-out amélioré
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Surveiller les métriques clés

Métriques CloudWatch essentielles à suivre :

  • IncomingRecords : Nombre d’enregistrements mis avec succès
  • IncomingBytes : Volume d’octets des données entrantes
  • GetRecords.IteratorAgeMilliseconds : Retard des consommateurs
  • WriteProvisionedThroughputExceeded : Événements de limitation (throttling)
  • ReadProvisionedThroughputExceeded : Limitation des consommateurs

5. Mettre en œuvre une stratégie de clé de partition appropriée

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Générer une clé de partition avec une distribution uniforme"""
    # Utiliser un hachage cohérent pour une distribution uniforme
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Exemple d’implémentation dans le monde réel

Voici un exemple complet d’architecture microservices de traitement de commandes :

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Créer une commande et publier des événements"""
        order_id = self.generate_order_id()
        
        # Publier l'événement de commande créée
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publier un événement vers le flux Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Consomme les événements de commande et met à jour le stock"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Mettre à jour la base de données de stock
        for item in order_data['items']:
            # Implémentation ici
            pass

Stratégie de migration du monolithe aux microservices

Phase 1 : Motif du Ficus Strangler (Strangler Fig Pattern)

Commencez par acheminer des événements spécifiques via Kinesis tout en conservant le monolithe :

  1. Identifiez les contextes délimités dans votre monolithe
  2. Créez des flux Kinesis pour les événements inter-contextuels
  3. Extrait progressivement les services qui consomment ces flux
  4. Maintenez la compatibilité ascendante avec le monolithe

Phase 2 : Traitement parallèle

Faites fonctionner les anciens et nouveaux systèmes en parallèle :

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Écrire à la fois vers le système hérité et le flux d'événements"""
    try:
        # Écrire d'abord vers le nouveau système
        publish_to_kinesis(kinesis_stream, data)
        
        # Puis mettre à jour le système hérité
        legacy_db.update(data)
    except Exception as e:
        # Mettre en œuvre la logique de compensation
        rollback_kinesis_event(kinesis_stream, data)
        raise

Phase 3 : Migration complète

Une fois la confiance établie, acheminez tout le trafic vers l’architecture pilotée par événements.

Stratégies d’optimisation des coûts

Pour des conseils complets sur les modèles d’infrastructure de données, y compris le stockage d’objets et les architectures de base de données, référez-vous à Infrastructure de données pour les systèmes IA : Stockage d’objets, Bases de données, Recherche & Architecture de données IA.

1. Utiliser le mode On-Demand pour les charges de travail variables

Le mode On-Demand (introduit en 2023) s’adapte automatiquement en fonction du trafic :

# Créer un flux en mode on-demand
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Mettre en œuvre l’agrégation de données

Réduisez les unités de charge PUT en regroupant les enregistrements :

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Agrégation d'enregistrements pour réduire les coûts"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Envoyer l'enregistrement agrégé
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimiser la rétention des données

La rétention par défaut est de 24 heures. Ne l’étendez que si nécessaire :

# Définir la rétention à 7 jours
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Meilleures pratiques de sécurité

1. Chiffrement au repos et en transit

# Créer un flux chiffré
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Activer le chiffrement
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Politiques IAM pour le principe du moindre privilège

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. Points de terminaison VPC

Gardez le trafic au sein du réseau AWS. Pour gérer l’infrastructure AWS en tant que code, envisagez d’utiliser Terraform - consultez notre fiche de référence Terraform :

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabilité et débogage

Traçage distribué avec X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Requêtes CloudWatch Logs Insights

-- Trouver les temps de traitement lents
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Suivre les taux d'erreur
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Modèles avancés

Modèle Saga pour les transactions distribuées

Implémentez des transactions longues durée traversant les microservices :

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Exécuter la saga avec une logique de compensation"""
        try:
            # Étape 1 : Réserver le stock
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Étape 2 : Traiter le paiement
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Étape 3 : Expédier la commande
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Compenser dans l'ordre inverse
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Exécuter les transactions de compensation"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Stratégies de test

Développement local avec LocalStack

# Démarrer LocalStack avec Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Créer un flux de test
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Tests d’intégration

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Tester la publication d'événements avec Kinesis moqué"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Ajustement des performances

Optimiser la taille des lots

def optimize_batch_processing(records, batch_size=500):
    """Traiter les enregistrements par lots optimisés"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Utiliser la mise en pool de connexions

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Liens utiles

Ressources AWS Kinesis :

Articles connexes :

Conclusion

AWS Kinesis fournit une base robuste pour la construction d’architectures microservices extensibles et pilotées par événements. En suivant ces modèles et meilleures pratiques, vous pouvez créer des systèmes résilients, évolutifs et maintenables. Commencez petit avec un seul flux d’événements, validez votre architecture et expandez progressivement vers des modèles plus complexes au fur et à mesure que votre système grandit.

La clé du succès réside dans la compréhension de vos besoins en flux de données, le choix du bon service Kinesis pour votre cas d’utilisation et la mise en œuvre d’une surveillance et d’une gestion d’erreurs appropriées dès le départ.