Construyendo microservicios orientados a eventos con AWS Kinesis

Arquitectura orientada a eventos con AWS Kinesis para escalar

Índice

AWS Kinesis se ha convertido en un pilar fundamental para construir arquitecturas modernas de microservicios orientadas a eventos, permitiendo el procesamiento en tiempo real de datos a gran escala con un mínimo sobrecoste operativo.

amazon-kinesis

Entendiendo la arquitectura de microservicios orientada a eventos

La arquitectura orientada a eventos (EDA) es un patrón de diseño donde los servicios se comunican mediante eventos en lugar de llamadas sincrónicas directas. Este enfoque ofrece varias ventajas:

  • Desacoplamiento suave: Los servicios no necesitan conocer la existencia de los demás
  • Escalabilidad: Cada servicio se escala independientemente según su carga de trabajo
  • Resiliencia: Los fallos en un servicio no se propagan a otros
  • Flexibilidad: Se pueden agregar nuevos servicios sin modificar los existentes

AWS Kinesis proporciona la base para implementar EDA al actuar como una secuencia de eventos distribuida y duradera que desacopla a los productores de los consumidores.

Visión general de AWS Kinesis

AWS ofrece varios servicios de Kinesis, cada uno diseñado para casos de uso específicos. Cuando se evalúan soluciones de streaming, también podría ser útil considerar comparar RabbitMQ en EKS vs SQS para diferentes patrones de mensajería y implicaciones de costo.

Kinesis Data Streams

El servicio de streaming principal que captura, almacena y procesa registros de datos en tiempo real. Data Streams es ideal para:

  • Aplicaciones de procesamiento personalizado en tiempo real
  • Construir canales de datos con latencia de subsegundo
  • Procesar millones de eventos por segundo
  • Implementar patrones de fuente de eventos

Kinesis Data Firehose

Un servicio totalmente gestionado que entrega datos de streaming a destinos como S3, Redshift, Elasticsearch o puntos finales HTTP. Ideal para:

  • Canales ETL simples
  • Agregación y archivado de registros
  • Análisis en tiempo real casi inmediato (mínimo de 60 segundos de latencia)
  • Escenarios donde no se necesita lógica de procesamiento personalizada

Kinesis Data Analytics

Procesa y analiza datos de streaming usando SQL o Apache Flink. Casos de uso incluyen:

  • Cuadros de mando en tiempo real
  • ETL en streaming
  • Detección de anomalías en tiempo real
  • Generación continua de métricas

Patrones arquitectónicos con Kinesis

1. Patrón de fuente de eventos

La fuente de eventos almacena todos los cambios en el estado de la aplicación como una secuencia de eventos. Kinesis es perfecto para esto. Si necesitas un repaso sobre fundamentos de Python, consulta nuestro cheat sheet de Python:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publicar un evento en el flujo de Kinesis"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Ejemplo: evento de registro de usuario
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. Patrón CQRS (Responsabilidad de consulta y comando)

Separa las operaciones de lectura y escritura usando Kinesis como bus de eventos:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Patrón de difusión con Lambda

Procesa eventos de un solo flujo con múltiples funciones Lambda. Para implementaciones en TypeScript con mayor seguridad de tipos, consulta nuestro cheat sheet de TypeScript:

// Consumidor de Lambda para notificaciones por correo electrónico
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Otro Lambda para actualizaciones de inventario
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Mejores prácticas para producción

1. Elegir el número correcto de shards

Calcule sus requisitos de shards basándose en:

  • Ingreso: 1 MB/seg o 1,000 registros/seg por shard
  • Salida: 2 MB/seg por shard (consumidores estándar) o 2 MB/seg por consumidor con difusión mejorada
def calculate_shards(records_per_second, avg_record_size_kb):
    """Calcular el número requerido de shards"""
    # Capacidad de ingreso
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Añadir buffer

2. Implementar manejo adecuado de errores

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Poner registro con reintento con retroceso exponencial"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Retroceso exponencial
                    continue
            raise

3. Usar difusión mejorada para múltiples consumidores

La difusión mejorada proporciona throughput dedicado para cada consumidor:

# Registrar un consumidor con difusión mejorada
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Monitorear métricas clave

Métricas esenciales de CloudWatch para seguir:

  • IncomingRecords: Número de registros insertados correctamente
  • IncomingBytes: Volumen de bytes de datos entrantes
  • GetRecords.IteratorAgeMilliseconds: Cuán atrás están los consumidores
  • WriteProvisionedThroughputExceeded: Eventos de limitación
  • ReadProvisionedThroughputExceeded: Limitación de consumidores

5. Implementar estrategia adecuada de clave de partición

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generar clave de partición con distribución equilibrada"""
    # Usar hashing consistente para distribución equilibrada
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Ejemplo de implementación real

Aquí hay un ejemplo completo de una arquitectura de microservicios para procesamiento de pedidos:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Crear orden y publicar eventos"""
        order_id = self.generate_order_id()
        
        # Publicar evento de orden creada
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publicar evento en el flujo de Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Consume eventos de orden y actualiza inventario"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Actualizar base de datos de inventario
        for item in order_data['items']:
            # Implementación aquí
            pass

Estrategia de migración de monolito a microservicios

Fase 1: Patrón de figa estranguladora

Comience redirigiendo eventos específicos a través de Kinesis mientras mantiene el monolito:

  1. Identifique contextos acotados en su monolito
  2. Cree flujos de Kinesis para eventos entre contextos
  3. Extraiga gradualmente servicios que consuman estos flujos
  4. Mantenga la compatibilidad hacia atrás con el monolito

Fase 2: Procesamiento paralelo

Ejecute ambos sistemas antiguos y nuevos en paralelo:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Escribir en ambos sistema antiguo y flujo de eventos"""
    try:
        # Escribir en nuevo sistema primero
        publish_to_kinesis(kinesis_stream, data)
        
        # Luego actualizar sistema antiguo
        legacy_db.update(data)
    except Exception as e:
        # Implementar lógica de compensación
        rollback_kinesis_event(kinesis_stream, data)
        raise

Fase 3: Migración completa

Una vez establecida la confianza, enrute todo el tráfico a través de la arquitectura orientada a eventos.

Estrategias de optimización de costos

1. Usar modo on-demand para cargas de trabajo variables

El modo on-demand (introducido en 2023) se escala automáticamente según el tráfico:

# Crear flujo con modo on-demand
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementar agregación de datos

Reduzca las unidades de PUT agrupando registros:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Agrupar registros para reducir costos"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Enviar registro agrupado
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimizar retención de datos

La retención predeterminada es de 24 horas. Solo extiéndala si es necesario:

# Establecer retención a 7 días
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Mejores prácticas de seguridad

1. Encriptación en reposo y en tránsito

# Crear flujo encriptado
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Habilitar encriptación
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Políticas IAM para el menor privilegio

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. Puntos finales de VPC

Mantenga el tráfico dentro de la red de AWS. Para gestionar la infraestructura de AWS como código, considere usar Terraform - consulte nuestro cheat sheet de Terraform:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabilidad y depuración

Rastreo distribuido con X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Consultas de CloudWatch Logs Insights

-- Encontrar tiempos de procesamiento lentos
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Seguir tasas de errores
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Patrones avanzados

Patrón Saga para transacciones distribuidas

Implemente transacciones de larga duración entre microservicios:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Ejecutar saga con lógica de compensación"""
        try:
            # Paso 1: Reservar inventario
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Paso 2: Procesar pago
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Paso 3: Enviar orden
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Compensar en orden inverso
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Ejecutar transacciones de compensación"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Estrategias de pruebas

Desarrollo local con LocalStack

# Iniciar LocalStack con Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Crear flujo de prueba
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Pruebas de integración

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Prueba de publicación de eventos con Kinesis simulado"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Ajuste de rendimiento

Optimizar tamaño de lote

def optimize_batch_processing(records, batch_size=500):
    """Procesar registros en lotes optimizados"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Usar pooling de conexiones

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Enlaces útiles

Recursos de AWS Kinesis:

Artículos relacionados:

Conclusión

AWS Kinesis proporciona una base sólida para construir arquitecturas de microservicios escalables orientadas a eventos. Siguiendo estos patrones y mejores prácticas, puedes crear sistemas que sean resistentes, escalables y mantenibles. Comienza pequeño con un solo flujo de eventos, valida tu arquitectura y expande gradualmente a patrones más complejos a medida que crece tu sistema.

La clave del éxito es comprender tus requisitos de flujo de datos, elegir el servicio adecuado de Kinesis para tu caso de uso y implementar monitoreo y manejo de errores adecuados desde el principio.