Construindo Microsserviços Orientados a Eventos com AWS Kinesis
Arquitetura orientada a eventos com AWS Kinesis para escala
AWS Kinesis tornou-se um pilar para a construção de arquiteturas modernas de microserviços orientadas a eventos, permitindo o processamento de dados em tempo real em grande escala com mínimo sobrecusto operacional.

Entendendo a Arquitetura de Microserviços Orientada a Eventos
A arquitetura orientada a eventos (EDA) é um padrão de design onde os serviços se comunicam por meio de eventos em vez de chamadas síncronas diretas. Este abordagem oferece vários benefícios:
- Desacoplamento: Os serviços não precisam saber sobre a existência uns dos outros
- Escalabilidade: Cada serviço escala independentemente com base em sua carga de trabalho
- Resiliência: Falhas em um serviço não se propagam para outros
- Flexibilidade: Novos serviços podem ser adicionados sem modificar os existentes
A AWS Kinesis fornece a estrutura para implementar EDA, atuando como um fluxo de eventos distribuído e durável que desacopla produtores de consumidores.
Visão Geral da AWS Kinesis
A AWS oferece vários serviços Kinesis, cada um projetado para casos de uso específicos. Ao avaliar soluções de streaming, você também pode querer considerar comparar RabbitMQ no EKS vs SQS para diferentes padrões de mensageria e implicações de custo.
Kinesis Data Streams
O serviço de streaming central que captura, armazena e processa registros de dados em tempo real. Data Streams é ideal para:
- Aplicações de processamento personalizado em tempo real
- Construção de pipelines de dados com latência subsegundo
- Processamento de milhões de eventos por segundo
- Implementação de padrões de sourcing de eventos
Kinesis Data Firehose
Um serviço totalmente gerenciado que entrega dados de streaming para destinos como S3, Redshift, Elasticsearch ou endpoints HTTP. Ideal para:
- Pipelines simples de ETL
- Agregação e arquivamento de logs
- Análise quase em tempo real (latência mínima de 60 segundos)
- Cenários onde você não precisa de lógica de processamento personalizada
Kinesis Data Analytics
Processa e analisa dados de streaming usando SQL ou Apache Flink. Casos de uso incluem:
- Dashboards em tempo real
- ETL de streaming
- Detecção de anomalias em tempo real
- Geração contínua de métricas
Padrões Arquitetônicos com Kinesis
1. Padrão de Sourcing de Eventos
O sourcing de eventos armazena todas as alterações no estado da aplicação como uma sequência de eventos. O Kinesis é perfeito para isso. Se você precisar de um reforço sobre fundamentos de Python, consulte nossa Folha de Dicas de Python:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Publicar um evento no fluxo Kinesis"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Exemplo: evento de registro de usuário
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Separação de Responsabilidades de Comando e Consulta)
Separe operações de leitura e escrita usando o Kinesis como o barramento de eventos:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Padrão de Saída em Fã com Lambda
Processar eventos de um único fluxo com múltiplas funções Lambda. Para implementações com TypeScript com maior segurança de tipos, consulte nossa Folha de Dicas de TypeScript:
// Consumidor Lambda para notificações por e-mail
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Outra Lambda para atualizações de estoque
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Boas Práticas para Produção
1. Escolher o Número Correto de Shards
Calcule suas necessidades de shard com base em:
- Entrada: 1 MB/seg ou 1.000 registros/seg por shard
- Saída: 2 MB/seg por shard (consumidores padrão) ou 2 MB/seg por consumidor com saída em fã aprimorada
def calculate_shards(records_per_second, avg_record_size_kb):
"""Calcular o número necessário de shards"""
# Capacidade de entrada
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Adicionar buffer
2. Implementar Tratamento Adequado de Erros
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Inserir registro com recuperação exponencial"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Recuperação exponencial
continue
raise
3. Usar Saída em Fã Aprimorada para Múltiplos Consumidores
A saída em fã aprimorada fornece throughput dedicado para cada consumidor:
# Registrar um consumidor com saída em fã aprimorada
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Monitorar Métricas Chave
Métricas essenciais do CloudWatch para acompanhar:
IncomingRecords: Número de registros inseridos com sucessoIncomingBytes: Volume de bytes de dados entrantesGetRecords.IteratorAgeMilliseconds: Quão atrasados estão os consumidoresWriteProvisionedThroughputExceeded: Eventos de limitaçãoReadProvisionedThroughputExceeded: Limitação dos consumidores
5. Implementar Estratégia Adequada de Chave de Partição
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Gerar chave de partição com distribuição equilibrada"""
# Usar hashing consistente para distribuição equilibrada
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Exemplo de Implementação Real
Aqui está um exemplo completo de uma arquitetura de microserviços de processamento de pedidos:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Criar pedido e publicar eventos"""
order_id = self.generate_order_id()
# Publicar evento de pedido criado
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Publicar evento no fluxo Kinesis"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Consome eventos de pedidos e atualiza o estoque"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Atualizar banco de dados de estoque
for item in order_data['items']:
# Implementação aqui
pass
Estratégia de Migração de Monolito para Microserviços
Fase 1: Padrão de Figura de Estrangulamento
Comece direcionando eventos específicos através do Kinesis enquanto mantém o monolito:
- Identifique contextos limitados no seu monolito
- Crie fluxos Kinesis para eventos entre contextos
- Extraia gradualmente serviços que consumam desses fluxos
- Mantenha compatibilidade para trás com o monolito
Fase 2: Processamento Paralelo
Execute ambos os sistemas antigo e novo em paralelo:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Escrever para o sistema legado e para o fluxo de eventos"""
try:
# Escrever para o novo sistema primeiro
publish_to_kinesis(kinesis_stream, data)
# Em seguida, atualize o sistema legado
legacy_db.update(data)
except Exception as e:
# Implementar lógica de compensação
rollback_kinesis_event(kinesis_stream, data)
raise
Fase 3: Migração Completa
Uma vez estabelecida a confiança, direcione todo o tráfego pela arquitetura orientada a eventos.
Estratégias de Otimização de Custo
1. Use o Modo On-Demand para Cargas de Trabalho Variáveis
O modo on-demand (introduzido em 2023) escala automaticamente com base no tráfego:
# Criar fluxo com modo on-demand
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implemente Agregação de Dados
Reduza as unidades de PUT agrupando registros:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Agrupar registros para reduzir custos"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Enviar registro agrupado
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Otimize a Retenção de Dados
A retenção padrão é de 24 horas. Apenas estenda-a se necessário:
# Definir retenção para 7 dias
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Boas Práticas de Segurança
1. Criptografia em Repouso e em Trânsito
# Criar fluxo criptografado
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Habilitar criptografia
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. Políticas IAM para Menor Privilégio
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. Pontos de Extensão VPC
Mantenha o tráfego dentro da rede AWS. Para gerenciar a infraestrutura AWS como código, considere usar o Terraform - veja nossa Folha de Dicas de Terraform:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observabilidade e Depuração
Rastreamento Distribuído com X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
Consultas de Logs do CloudWatch Insights
-- Encontrar tempos de processamento lentos
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Rastrear taxas de erro
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Padrões Avançados
Padrão Saga para Transações Distribuídas
Implemente transações de longa duração entre microserviços:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Executar saga com lógica de compensação"""
try:
# Etapa 1: Reservar estoque
self.publish_command('RESERVE_INVENTORY', order_data)
# Etapa 2: Processar pagamento
self.publish_command('PROCESS_PAYMENT', order_data)
# Etapa 3: Enviar pedido
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Compensar na ordem inversa
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Executar transações de compensação"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Estratégias de Teste
Desenvolvimento Local com LocalStack
# Iniciar LocalStack com Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Criar fluxo de teste
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Testes de Integração
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Testar publicação de eventos com Kinesis simulado"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Ajuste de Desempenho
Otimizar Tamanho do Lote
def optimize_batch_processing(records, batch_size=500):
"""Processar registros em lotes otimizados"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Usar Pool de Conexões
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Links Úteis
Recursos da AWS Kinesis:
- Documentação da AWS Kinesis
- Guia do Desenvolvedor da AWS Kinesis Data Streams
- Biblioteca do Cliente Kinesis (KCL)
- Calculadora de Preços da AWS Kinesis
- Limites e Quotas da AWS Kinesis Data Streams
- Blog de Arquitetura da AWS - Arquiteturas Orientadas a Eventos
- Exemplos da AWS - Exemplos de Kinesis
Artigos Relacionados:
- Comparação de custos de hospedagem entre Rabbitmq no Eks vs Sqs
- Folha de Dicas de TypeScript: Conceitos Principais & Boas Práticas
- Folha de Dicas de Python
- Folha de Dicas de Terraform - Comandos Úteis e Exemplos
Conclusão
A AWS Kinesis fornece uma base sólida para construir arquiteturas de microserviços orientadas a eventos escaláveis. Ao seguir esses padrões e boas práticas, você pode criar sistemas resistentes, escaláveis e mantíveis. Comece pequeno com um único fluxo de eventos, valide sua arquitetura e expanda gradualmente para padrões mais complexos conforme seu sistema cresce.
A chave para o sucesso é compreender os requisitos de fluxo de dados, escolher o serviço Kinesis adequado para seu caso de uso e implementar monitoramento e tratamento de erros adequados desde o início.