Building Event-Driven Microservices with AWS Kinesis

Event-driven architecture with AWS Kinesis for scale

Page content

AWS Kinesis has become a cornerstone for building modern event-driven microservices architectures, enabling real-time data processing at scale with minimal operational overhead.

amazon-kinesis

Understanding Event-Driven Microservices Architecture

Event-driven architecture (EDA) is a design pattern where services communicate through events rather than direct synchronous calls. This approach offers several advantages:

  • Loose coupling: Services don’t need to know about each other’s existence
  • Scalability: Each service scales independently based on its workload
  • Resilience: Failures in one service don’t cascade to others
  • Flexibility: New services can be added without modifying existing ones

AWS Kinesis provides the backbone for implementing EDA by acting as a distributed, durable event stream that decouples producers from consumers.

AWS Kinesis Overview

AWS offers several Kinesis services, each designed for specific use cases. When evaluating streaming solutions, you might also want to consider comparing RabbitMQ on EKS vs SQS for different messaging patterns and cost implications.

Kinesis Data Streams

The core streaming service that captures, stores, and processes data records in real-time. Data Streams is ideal for:

  • Custom real-time processing applications
  • Building data pipelines with sub-second latency
  • Processing millions of events per second
  • Implementing event sourcing patterns

Kinesis Data Firehose

A fully managed service that delivers streaming data to destinations like S3, Redshift, Elasticsearch, or HTTP endpoints. Best for:

  • Simple ETL pipelines
  • Log aggregation and archival
  • Near-real-time analytics (60-second minimum latency)
  • Scenarios where you don’t need custom processing logic

Kinesis Data Analytics

Processes and analyzes streaming data using SQL or Apache Flink. Use cases include:

  • Real-time dashboards
  • Streaming ETL
  • Real-time anomaly detection
  • Continuous metric generation

Architectural Patterns with Kinesis

1. Event Sourcing Pattern

Event sourcing stores all changes to application state as a sequence of events. Kinesis is perfect for this. If you need a refresher on Python fundamentals, check out our Python Cheatsheet:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publish an event to Kinesis stream"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Example: User registration event
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Separate read and write operations using Kinesis as the event bus:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Fan-Out Pattern with Lambda

Process events from a single stream with multiple Lambda functions. For TypeScript implementations with stronger type safety, refer to our TypeScript Cheatsheet:

// Lambda consumer for email notifications
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Another Lambda for inventory updates
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Best Practices for Production

1. Choosing the Right Shard Count

Calculate your shard requirements based on:

  • Ingress: 1 MB/sec or 1,000 records/sec per shard
  • Egress: 2 MB/sec per shard (standard consumers) or 2 MB/sec per consumer with enhanced fan-out
def calculate_shards(records_per_second, avg_record_size_kb):
    """Calculate required number of shards"""
    # Ingress capacity
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Add buffer

2. Implement Proper Error Handling

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Put record with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponential backoff
                    continue
            raise

3. Use Enhanced Fan-Out for Multiple Consumers

Enhanced fan-out provides dedicated throughput for each consumer:

# Register a consumer with enhanced fan-out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Monitor Key Metrics

Essential CloudWatch metrics to track:

  • IncomingRecords: Number of records successfully put
  • IncomingBytes: Byte volume of incoming data
  • GetRecords.IteratorAgeMilliseconds: How far behind consumers are
  • WriteProvisionedThroughputExceeded: Throttling events
  • ReadProvisionedThroughputExceeded: Consumer throttling

5. Implement Proper Partition Key Strategy

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generate partition key with even distribution"""
    # Use consistent hashing for even distribution
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Real-World Implementation Example

Here’s a complete example of an order processing microservices architecture:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Create order and publish events"""
        order_id = self.generate_order_id()
        
        # Publish order created event
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publish event to Kinesis stream"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Consumes order events and updates inventory"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Update inventory database
        for item in order_data['items']:
            # Implementation here
            pass

Migration Strategy from Monolith to Microservices

Phase 1: Strangler Fig Pattern

Start by routing specific events through Kinesis while keeping the monolith:

  1. Identify bounded contexts in your monolith
  2. Create Kinesis streams for cross-context events
  3. Gradually extract services that consume from these streams
  4. Maintain backward compatibility with the monolith

Phase 2: Parallel Processing

Run both old and new systems in parallel:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Write to both legacy system and event stream"""
    try:
        # Write to new system first
        publish_to_kinesis(kinesis_stream, data)
        
        # Then update legacy system
        legacy_db.update(data)
    except Exception as e:
        # Implement compensation logic
        rollback_kinesis_event(kinesis_stream, data)
        raise

Phase 3: Full Migration

Once confidence is established, route all traffic through the event-driven architecture.

Cost Optimization Strategies

1. Use On-Demand Mode for Variable Workloads

On-demand mode (introduced in 2023) automatically scales based on traffic:

# Create stream with on-demand mode
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implement Data Aggregation

Reduce PUT payload units by batching records:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Aggregate records to reduce costs"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Send aggregated record
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimize Data Retention

Default retention is 24 hours. Only extend it if necessary:

# Set retention to 7 days
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Security Best Practices

1. Encryption at Rest and in Transit

# Create encrypted stream
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Enable encryption
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. IAM Policies for Least Privilege

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC Endpoints

Keep traffic within AWS network. For managing AWS infrastructure as code, consider using Terraform - see our Terraform cheatsheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observability and Debugging

Distributed Tracing with X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights Queries

-- Find slow processing times
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Track error rates
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Advanced Patterns

Saga Pattern for Distributed Transactions

Implement long-running transactions across microservices:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Execute saga with compensation logic"""
        try:
            # Step 1: Reserve inventory
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Step 2: Process payment
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Step 3: Ship order
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Compensate in reverse order
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Execute compensation transactions"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Testing Strategies

Local Development with LocalStack

# Start LocalStack with Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Create test stream
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Integration Testing

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Test event publishing with mocked Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Performance Tuning

Optimize Batch Size

def optimize_batch_processing(records, batch_size=500):
    """Process records in optimized batches"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Use Connection Pooling

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

AWS Kinesis Resources:

Related Articles:

Conclusion

AWS Kinesis provides a robust foundation for building scalable, event-driven microservices architectures. By following these patterns and best practices, you can create systems that are resilient, scalable, and maintainable. Start small with a single event stream, validate your architecture, and gradually expand to more complex patterns as your system grows.

The key to success is understanding your data flow requirements, choosing the right Kinesis service for your use case, and implementing proper monitoring and error handling from the start.