Building Event-Driven Microservices with AWS Kinesis
Event-driven architecture with AWS Kinesis for scale
AWS Kinesis has become a cornerstone for building modern event-driven microservices architectures, enabling real-time data processing at scale with minimal operational overhead.

Understanding Event-Driven Microservices Architecture
Event-driven architecture (EDA) is a design pattern where services communicate through events rather than direct synchronous calls. This approach offers several advantages:
- Loose coupling: Services don’t need to know about each other’s existence
- Scalability: Each service scales independently based on its workload
- Resilience: Failures in one service don’t cascade to others
- Flexibility: New services can be added without modifying existing ones
AWS Kinesis provides the backbone for implementing EDA by acting as a distributed, durable event stream that decouples producers from consumers.
AWS Kinesis Overview
AWS offers several Kinesis services, each designed for specific use cases. When evaluating streaming solutions, you might also want to consider comparing RabbitMQ on EKS vs SQS for different messaging patterns and cost implications.
Kinesis Data Streams
The core streaming service that captures, stores, and processes data records in real-time. Data Streams is ideal for:
- Custom real-time processing applications
- Building data pipelines with sub-second latency
- Processing millions of events per second
- Implementing event sourcing patterns
Kinesis Data Firehose
A fully managed service that delivers streaming data to destinations like S3, Redshift, Elasticsearch, or HTTP endpoints. Best for:
- Simple ETL pipelines
- Log aggregation and archival
- Near-real-time analytics (60-second minimum latency)
- Scenarios where you don’t need custom processing logic
Kinesis Data Analytics
Processes and analyzes streaming data using SQL or Apache Flink. Use cases include:
- Real-time dashboards
- Streaming ETL
- Real-time anomaly detection
- Continuous metric generation
Architectural Patterns with Kinesis
1. Event Sourcing Pattern
Event sourcing stores all changes to application state as a sequence of events. Kinesis is perfect for this. If you need a refresher on Python fundamentals, check out our Python Cheatsheet:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Publish an event to Kinesis stream"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Example: User registration event
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Separate read and write operations using Kinesis as the event bus:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Fan-Out Pattern with Lambda
Process events from a single stream with multiple Lambda functions. For TypeScript implementations with stronger type safety, refer to our TypeScript Cheatsheet:
// Lambda consumer for email notifications
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Another Lambda for inventory updates
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Best Practices for Production
1. Choosing the Right Shard Count
Calculate your shard requirements based on:
- Ingress: 1 MB/sec or 1,000 records/sec per shard
- Egress: 2 MB/sec per shard (standard consumers) or 2 MB/sec per consumer with enhanced fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
"""Calculate required number of shards"""
# Ingress capacity
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Add buffer
2. Implement Proper Error Handling
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Put record with exponential backoff retry"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise
3. Use Enhanced Fan-Out for Multiple Consumers
Enhanced fan-out provides dedicated throughput for each consumer:
# Register a consumer with enhanced fan-out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Monitor Key Metrics
Essential CloudWatch metrics to track:
IncomingRecords: Number of records successfully putIncomingBytes: Byte volume of incoming dataGetRecords.IteratorAgeMilliseconds: How far behind consumers areWriteProvisionedThroughputExceeded: Throttling eventsReadProvisionedThroughputExceeded: Consumer throttling
5. Implement Proper Partition Key Strategy
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generate partition key with even distribution"""
# Use consistent hashing for even distribution
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Real-World Implementation Example
Here’s a complete example of an order processing microservices architecture:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Create order and publish events"""
order_id = self.generate_order_id()
# Publish order created event
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Publish event to Kinesis stream"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Consumes order events and updates inventory"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Update inventory database
for item in order_data['items']:
# Implementation here
pass
Migration Strategy from Monolith to Microservices
Phase 1: Strangler Fig Pattern
Start by routing specific events through Kinesis while keeping the monolith:
- Identify bounded contexts in your monolith
- Create Kinesis streams for cross-context events
- Gradually extract services that consume from these streams
- Maintain backward compatibility with the monolith
Phase 2: Parallel Processing
Run both old and new systems in parallel:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Write to both legacy system and event stream"""
try:
# Write to new system first
publish_to_kinesis(kinesis_stream, data)
# Then update legacy system
legacy_db.update(data)
except Exception as e:
# Implement compensation logic
rollback_kinesis_event(kinesis_stream, data)
raise
Phase 3: Full Migration
Once confidence is established, route all traffic through the event-driven architecture.
Cost Optimization Strategies
1. Use On-Demand Mode for Variable Workloads
On-demand mode (introduced in 2023) automatically scales based on traffic:
# Create stream with on-demand mode
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implement Data Aggregation
Reduce PUT payload units by batching records:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Aggregate records to reduce costs"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Send aggregated record
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimize Data Retention
Default retention is 24 hours. Only extend it if necessary:
# Set retention to 7 days
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Security Best Practices
1. Encryption at Rest and in Transit
# Create encrypted stream
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Enable encryption
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. IAM Policies for Least Privilege
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC Endpoints
Keep traffic within AWS network. For managing AWS infrastructure as code, consider using Terraform - see our Terraform cheatsheet:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observability and Debugging
Distributed Tracing with X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights Queries
-- Find slow processing times
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Track error rates
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Advanced Patterns
Saga Pattern for Distributed Transactions
Implement long-running transactions across microservices:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Execute saga with compensation logic"""
try:
# Step 1: Reserve inventory
self.publish_command('RESERVE_INVENTORY', order_data)
# Step 2: Process payment
self.publish_command('PROCESS_PAYMENT', order_data)
# Step 3: Ship order
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Compensate in reverse order
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Execute compensation transactions"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Testing Strategies
Local Development with LocalStack
# Start LocalStack with Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Create test stream
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Integration Testing
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Test event publishing with mocked Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Performance Tuning
Optimize Batch Size
def optimize_batch_processing(records, batch_size=500):
"""Process records in optimized batches"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Use Connection Pooling
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Useful Links
AWS Kinesis Resources:
- AWS Kinesis Documentation
- AWS Kinesis Data Streams Developer Guide
- Kinesis Client Library (KCL)
- AWS Kinesis Pricing Calculator
- Kinesis Data Streams Quotas and Limits
- AWS Architecture Blog - Event-Driven Architectures
- AWS Samples - Kinesis Examples
Related Articles:
- Rabbitmq on Eks vs Sqs hosting cost comparison
- TypeScript Cheatsheet: Core Concepts & Best Practices
- Python Cheatsheet
- Terraform cheatsheet - useful commands and examples
Conclusion
AWS Kinesis provides a robust foundation for building scalable, event-driven microservices architectures. By following these patterns and best practices, you can create systems that are resilient, scalable, and maintainable. Start small with a single event stream, validate your architecture, and gradually expand to more complex patterns as your system grows.
The key to success is understanding your data flow requirements, choosing the right Kinesis service for your use case, and implementing proper monitoring and error handling from the start.