Construyendo microservicios orientados a eventos con AWS Kinesis
Arquitectura orientada a eventos con AWS Kinesis para escalar
AWS Kinesis se ha convertido en un pilar fundamental para construir arquitecturas modernas de microservicios orientadas a eventos, permitiendo el procesamiento en tiempo real de datos a gran escala con un mínimo sobrecoste operativo.

Entendiendo la arquitectura de microservicios orientada a eventos
La arquitectura orientada a eventos (EDA) es un patrón de diseño donde los servicios se comunican mediante eventos en lugar de llamadas sincrónicas directas. Este enfoque ofrece varias ventajas:
- Desacoplamiento suave: Los servicios no necesitan conocer la existencia de los demás
- Escalabilidad: Cada servicio se escala independientemente según su carga de trabajo
- Resiliencia: Los fallos en un servicio no se propagan a otros
- Flexibilidad: Se pueden agregar nuevos servicios sin modificar los existentes
AWS Kinesis proporciona la base para implementar EDA al actuar como una secuencia de eventos distribuida y duradera que desacopla a los productores de los consumidores.
Visión general de AWS Kinesis
AWS ofrece varios servicios de Kinesis, cada uno diseñado para casos de uso específicos. Cuando se evalúan soluciones de streaming, también podría ser útil considerar comparar RabbitMQ en EKS vs SQS para diferentes patrones de mensajería y implicaciones de costo.
Kinesis Data Streams
El servicio de streaming principal que captura, almacena y procesa registros de datos en tiempo real. Data Streams es ideal para:
- Aplicaciones de procesamiento personalizado en tiempo real
- Construir canales de datos con latencia de subsegundo
- Procesar millones de eventos por segundo
- Implementar patrones de fuente de eventos
Kinesis Data Firehose
Un servicio totalmente gestionado que entrega datos de streaming a destinos como S3, Redshift, Elasticsearch o puntos finales HTTP. Ideal para:
- Canales ETL simples
- Agregación y archivado de registros
- Análisis en tiempo real casi inmediato (mínimo de 60 segundos de latencia)
- Escenarios donde no se necesita lógica de procesamiento personalizada
Kinesis Data Analytics
Procesa y analiza datos de streaming usando SQL o Apache Flink. Casos de uso incluyen:
- Cuadros de mando en tiempo real
- ETL en streaming
- Detección de anomalías en tiempo real
- Generación continua de métricas
Patrones arquitectónicos con Kinesis
1. Patrón de fuente de eventos
La fuente de eventos almacena todos los cambios en el estado de la aplicación como una secuencia de eventos. Kinesis es perfecto para esto. Si necesitas un repaso sobre fundamentos de Python, consulta nuestro cheat sheet de Python:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Publicar un evento en el flujo de Kinesis"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Ejemplo: evento de registro de usuario
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. Patrón CQRS (Responsabilidad de consulta y comando)
Separa las operaciones de lectura y escritura usando Kinesis como bus de eventos:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Patrón de difusión con Lambda
Procesa eventos de un solo flujo con múltiples funciones Lambda. Para implementaciones en TypeScript con mayor seguridad de tipos, consulta nuestro cheat sheet de TypeScript:
// Consumidor de Lambda para notificaciones por correo electrónico
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Otro Lambda para actualizaciones de inventario
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Mejores prácticas para producción
1. Elegir el número correcto de shards
Calcule sus requisitos de shards basándose en:
- Ingreso: 1 MB/seg o 1,000 registros/seg por shard
- Salida: 2 MB/seg por shard (consumidores estándar) o 2 MB/seg por consumidor con difusión mejorada
def calculate_shards(records_per_second, avg_record_size_kb):
"""Calcular el número requerido de shards"""
# Capacidad de ingreso
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Añadir buffer
2. Implementar manejo adecuado de errores
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Poner registro con reintento con retroceso exponencial"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Retroceso exponencial
continue
raise
3. Usar difusión mejorada para múltiples consumidores
La difusión mejorada proporciona throughput dedicado para cada consumidor:
# Registrar un consumidor con difusión mejorada
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Monitorear métricas clave
Métricas esenciales de CloudWatch para seguir:
IncomingRecords: Número de registros insertados correctamenteIncomingBytes: Volumen de bytes de datos entrantesGetRecords.IteratorAgeMilliseconds: Cuán atrás están los consumidoresWriteProvisionedThroughputExceeded: Eventos de limitaciónReadProvisionedThroughputExceeded: Limitación de consumidores
5. Implementar estrategia adecuada de clave de partición
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generar clave de partición con distribución equilibrada"""
# Usar hashing consistente para distribución equilibrada
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Ejemplo de implementación real
Aquí hay un ejemplo completo de una arquitectura de microservicios para procesamiento de pedidos:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Crear orden y publicar eventos"""
order_id = self.generate_order_id()
# Publicar evento de orden creada
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Publicar evento en el flujo de Kinesis"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Consume eventos de orden y actualiza inventario"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Actualizar base de datos de inventario
for item in order_data['items']:
# Implementación aquí
pass
Estrategia de migración de monolito a microservicios
Fase 1: Patrón de figa estranguladora
Comience redirigiendo eventos específicos a través de Kinesis mientras mantiene el monolito:
- Identifique contextos acotados en su monolito
- Cree flujos de Kinesis para eventos entre contextos
- Extraiga gradualmente servicios que consuman estos flujos
- Mantenga la compatibilidad hacia atrás con el monolito
Fase 2: Procesamiento paralelo
Ejecute ambos sistemas antiguos y nuevos en paralelo:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Escribir en ambos sistema antiguo y flujo de eventos"""
try:
# Escribir en nuevo sistema primero
publish_to_kinesis(kinesis_stream, data)
# Luego actualizar sistema antiguo
legacy_db.update(data)
except Exception as e:
# Implementar lógica de compensación
rollback_kinesis_event(kinesis_stream, data)
raise
Fase 3: Migración completa
Una vez establecida la confianza, enrute todo el tráfico a través de la arquitectura orientada a eventos.
Estrategias de optimización de costos
1. Usar modo on-demand para cargas de trabajo variables
El modo on-demand (introducido en 2023) se escala automáticamente según el tráfico:
# Crear flujo con modo on-demand
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implementar agregación de datos
Reduzca las unidades de PUT agrupando registros:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Agrupar registros para reducir costos"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Enviar registro agrupado
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimizar retención de datos
La retención predeterminada es de 24 horas. Solo extiéndala si es necesario:
# Establecer retención a 7 días
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Mejores prácticas de seguridad
1. Encriptación en reposo y en tránsito
# Crear flujo encriptado
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Habilitar encriptación
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. Políticas IAM para el menor privilegio
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. Puntos finales de VPC
Mantenga el tráfico dentro de la red de AWS. Para gestionar la infraestructura de AWS como código, considere usar Terraform - consulte nuestro cheat sheet de Terraform:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observabilidad y depuración
Rastreo distribuido con X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
Consultas de CloudWatch Logs Insights
-- Encontrar tiempos de procesamiento lentos
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Seguir tasas de errores
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Patrones avanzados
Patrón Saga para transacciones distribuidas
Implemente transacciones de larga duración entre microservicios:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Ejecutar saga con lógica de compensación"""
try:
# Paso 1: Reservar inventario
self.publish_command('RESERVE_INVENTORY', order_data)
# Paso 2: Procesar pago
self.publish_command('PROCESS_PAYMENT', order_data)
# Paso 3: Enviar orden
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Compensar en orden inverso
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Ejecutar transacciones de compensación"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Estrategias de pruebas
Desarrollo local con LocalStack
# Iniciar LocalStack con Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Crear flujo de prueba
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Pruebas de integración
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Prueba de publicación de eventos con Kinesis simulado"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Ajuste de rendimiento
Optimizar tamaño de lote
def optimize_batch_processing(records, batch_size=500):
"""Procesar registros en lotes optimizados"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Usar pooling de conexiones
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Enlaces útiles
Recursos de AWS Kinesis:
- Documentación de AWS Kinesis
- Guía del desarrollador de AWS Kinesis Data Streams
- Biblioteca de cliente de Kinesis (KCL)
- Calculadora de precios de AWS Kinesis
- Cuotas y límites de Kinesis Data Streams
- Blog de arquitectura de AWS - Arquitecturas orientadas a eventos
- Ejemplos de AWS Samples - Kinesis
Artículos relacionados:
- Comparación de costos de alojamiento entre Rabbitmq en Eks vs Sqs
- Cheat sheet de TypeScript: conceptos básicos y mejores prácticas
- Cheat sheet de Python
- Cheat sheet de Terraform - comandos útiles y ejemplos
Conclusión
AWS Kinesis proporciona una base sólida para construir arquitecturas de microservicios escalables orientadas a eventos. Siguiendo estos patrones y mejores prácticas, puedes crear sistemas que sean resistentes, escalables y mantenibles. Comienza pequeño con un solo flujo de eventos, valida tu arquitectura y expande gradualmente a patrones más complejos a medida que crece tu sistema.
La clave del éxito es comprender tus requisitos de flujo de datos, elegir el servicio adecuado de Kinesis para tu caso de uso y implementar monitoreo y manejo de errores adecuados desde el principio.