使用 AWS Kinesis 构建事件驱动的微服务

基于 AWS Kinesis 的事件驱动架构实现可扩展性

目录

AWS Kinesis 已成为构建现代事件驱动微服务架构的基石,它能够以最小的操作开销实现大规模的实时数据处理。

amazon-kinesis

理解事件驱动微服务架构

事件驱动架构(EDA)是一种设计模式,其中服务通过事件进行通信,而不是直接的同步调用。这种方法具有以下优势:

  • 松耦合:服务不需要知道彼此的存在
  • 可扩展性:每个服务根据其工作负载独立扩展
  • 弹性:一个服务的故障不会影响到其他服务
  • 灵活性:可以在不修改现有服务的情况下添加新服务

AWS Kinesis 通过充当一个分布式、持久的事件流,将生产者与消费者解耦,为实现 EDA 提供了基础。

AWS Kinesis 概述

AWS 提供了多种 Kinesis 服务,每种服务都针对特定的使用场景进行了设计。在评估流处理解决方案时,您可能还想考虑 比较 RabbitMQ on EKS 与 SQS,以了解不同消息模式和成本影响。

Kinesis 数据流

核心的流处理服务,用于捕获、存储和实时处理数据记录。数据流非常适合以下场景:

  • 自定义的实时处理应用
  • 构建具有亚秒级延迟的数据管道
  • 每秒处理数百万个事件
  • 实现事件溯源模式

Kinesis 数据火 hose

一个完全托管的服务,用于将流数据传输到 S3、Redshift、Elasticsearch 或 HTTP 端点等目的地。最适合以下场景:

  • 简单的 ETL 管道
  • 日志聚合和归档
  • 几乎实时分析(最低延迟为 60 秒)
  • 不需要自定义处理逻辑的场景

Kinesis 数据分析

使用 SQL 或 Apache Flink 处理和分析流数据。使用场景包括:

  • 实时仪表板
  • 流式 ETL
  • 实时异常检测
  • 持续指标生成

使用 Kinesis 的架构模式

1. 事件溯源模式

事件溯源将所有对应用状态的更改记录为事件序列。Kinesis 非常适合这种模式。如果您需要复习 Python 基础知识,请查看我们的 Python 快速参考

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """将事件发布到 Kinesis 流"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# 示例:用户注册事件
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS(命令查询职责分离)

使用 Kinesis 作为事件总线,分离读写操作:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. 使用 Lambda 的扇出模式

使用多个 Lambda 函数处理来自单一流的数据。对于具有更强类型安全性的 TypeScript 实现,请参考我们的 TypeScript 快速参考

// 用于电子邮件通知的 Lambda 消费者
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// 用于库存更新的另一个 Lambda
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

生产最佳实践

1. 选择合适的分片数量

根据以下因素计算您的分片需求:

  • 入站流量:每个分片每秒 1 MB 或 1,000 条记录
  • 出站流量:每个分片每秒 2 MB(标准消费者)或每个消费者每秒 2 MB(增强扇出)
def calculate_shards(records_per_second, avg_record_size_kb):
    """计算所需分片数量"""
    # 入站容量
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # 添加缓冲

2. 实现适当的错误处理

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """带指数退避重试的记录写入"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # 指数退避
                    continue
            raise

3. 使用增强扇出为多个消费者提供服务

增强扇出为每个消费者提供专用吞吐量:

# 注册一个带有增强扇出的消费者
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. 监控关键指标

需要跟踪的重要 CloudWatch 指标包括:

  • IncomingRecords:成功写入的记录数
  • IncomingBytes:入站数据的字节数量
  • GetRecords.IteratorAgeMilliseconds:消费者落后的时间
  • WriteProvisionedThroughputExceeded:节流事件
  • ReadProvisionedThroughputExceeded:消费者节流

5. 实现适当的分区键策略

import hashlib

def get_partition_key(user_id, shard_count=10):
    """生成具有均匀分布的分区键"""
    # 使用一致性哈希实现均匀分布
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

实际应用示例

以下是订单处理微服务架构的完整示例:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """创建订单并发布事件"""
        order_id = self.generate_order_id()
        
        # 发布订单创建事件
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """将事件发布到 Kinesis 流"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """消费订单事件并更新库存"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # 更新库存数据库
        for item in order_data['items']:
            # 实现细节
            pass

从单体应用迁移到微服务的策略

第一阶段:绞杀者模式

首先通过 Kinesis 路由特定事件,同时保留单体应用:

  1. 识别单体应用中的有界上下文
  2. 为跨上下文事件创建 Kinesis 流
  3. 逐步提取消费这些流的服务
  4. 与单体应用保持向后兼容性

第二阶段:并行处理

同时运行旧系统和新系统:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """同时写入旧系统和事件流"""
    try:
        # 首先写入新系统
        publish_to_kinesis(kinesis_stream, data)
        
        # 然后更新旧系统
        legacy_db.update(data)
    except Exception as e:
        # 实现补偿逻辑
        rollback_kinesis_event(kinesis_stream, data)
        raise

第三阶段:全面迁移

一旦建立信心,将所有流量路由到事件驱动架构。

成本优化策略

1. 对于可变工作负载使用按需模式

按需模式(2023 年引入)根据流量自动扩展:

# 创建使用按需模式的流
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. 实施数据聚合

通过批量记录减少 PUT 负载单位:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """聚合记录以减少成本"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # 发送聚合记录
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. 优化数据保留

默认保留时间为 24 小时。只有在必要时才延长:

# 设置保留时间为 7 天
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

安全最佳实践

1. 静态和传输中的加密

# 创建加密流
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# 启用加密
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. 最小权限的 IAM 策略

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC 端点

保持流量在 AWS 网络内。对于管理 AWS 基础设施的代码,请考虑使用 Terraform - 请参阅我们的 Terraform 快速参考

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

可观测性和调试

使用 X-Ray 的分布式追踪

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights 查询

-- 查找处理时间慢的记录
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- 跟踪错误率
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

高级模式

分布式事务的 Saga 模式

跨微服务实现长期事务:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """执行 Saga 并补偿逻辑"""
        try:
            # 步骤 1:预留库存
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # 步骤 2:处理支付
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # 步骤 3:发货
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # 反向顺序补偿
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """执行补偿事务"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

测试策略

使用 LocalStack 进行本地开发

# 启动带有 Kinesis 的 LocalStack
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# 创建测试流
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

集成测试

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """测试事件发布(使用模拟的 Kinesis)"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

性能调优

优化批量大小

def optimize_batch_processing(records, batch_size=500):
    """以优化的批量处理记录"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

使用连接池

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

有用的链接

AWS Kinesis 资源:

相关文章:

结论

AWS Kinesis 为构建可扩展的事件驱动微服务架构提供了坚实的基础。通过遵循这些模式和最佳实践,您可以创建出具有弹性和可维护性的系统。从小规模开始,使用单个事件流验证您的架构,然后随着系统的发展逐步扩展到更复杂的模式。

成功的关键在于理解您的数据流需求,为您的使用场景选择合适的 Kinesis 服务,并从一开始就实施适当的监控和错误处理。