使用 AWS Kinesis 构建事件驱动的微服务
基于 AWS Kinesis 的事件驱动架构实现可扩展性
AWS Kinesis 已成为构建现代事件驱动微服务架构的基石,它能够以最小的操作开销实现大规模的实时数据处理。

理解事件驱动微服务架构
事件驱动架构(EDA)是一种设计模式,其中服务通过事件进行通信,而不是直接的同步调用。这种方法具有以下优势:
- 松耦合:服务不需要知道彼此的存在
- 可扩展性:每个服务根据其工作负载独立扩展
- 弹性:一个服务的故障不会影响到其他服务
- 灵活性:可以在不修改现有服务的情况下添加新服务
AWS Kinesis 通过充当一个分布式、持久的事件流,将生产者与消费者解耦,为实现 EDA 提供了基础。
AWS Kinesis 概述
AWS 提供了多种 Kinesis 服务,每种服务都针对特定的使用场景进行了设计。在评估流处理解决方案时,您可能还想考虑 比较 RabbitMQ on EKS 与 SQS,以了解不同消息模式和成本影响。
Kinesis 数据流
核心的流处理服务,用于捕获、存储和实时处理数据记录。数据流非常适合以下场景:
- 自定义的实时处理应用
- 构建具有亚秒级延迟的数据管道
- 每秒处理数百万个事件
- 实现事件溯源模式
Kinesis 数据火 hose
一个完全托管的服务,用于将流数据传输到 S3、Redshift、Elasticsearch 或 HTTP 端点等目的地。最适合以下场景:
- 简单的 ETL 管道
- 日志聚合和归档
- 几乎实时分析(最低延迟为 60 秒)
- 不需要自定义处理逻辑的场景
Kinesis 数据分析
使用 SQL 或 Apache Flink 处理和分析流数据。使用场景包括:
- 实时仪表板
- 流式 ETL
- 实时异常检测
- 持续指标生成
使用 Kinesis 的架构模式
1. 事件溯源模式
事件溯源将所有对应用状态的更改记录为事件序列。Kinesis 非常适合这种模式。如果您需要复习 Python 基础知识,请查看我们的 Python 快速参考:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""将事件发布到 Kinesis 流"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# 示例:用户注册事件
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS(命令查询职责分离)
使用 Kinesis 作为事件总线,分离读写操作:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. 使用 Lambda 的扇出模式
使用多个 Lambda 函数处理来自单一流的数据。对于具有更强类型安全性的 TypeScript 实现,请参考我们的 TypeScript 快速参考:
// 用于电子邮件通知的 Lambda 消费者
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// 用于库存更新的另一个 Lambda
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
生产最佳实践
1. 选择合适的分片数量
根据以下因素计算您的分片需求:
- 入站流量:每个分片每秒 1 MB 或 1,000 条记录
- 出站流量:每个分片每秒 2 MB(标准消费者)或每个消费者每秒 2 MB(增强扇出)
def calculate_shards(records_per_second, avg_record_size_kb):
"""计算所需分片数量"""
# 入站容量
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # 添加缓冲
2. 实现适当的错误处理
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""带指数退避重试的记录写入"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
continue
raise
3. 使用增强扇出为多个消费者提供服务
增强扇出为每个消费者提供专用吞吐量:
# 注册一个带有增强扇出的消费者
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. 监控关键指标
需要跟踪的重要 CloudWatch 指标包括:
IncomingRecords:成功写入的记录数IncomingBytes:入站数据的字节数量GetRecords.IteratorAgeMilliseconds:消费者落后的时间WriteProvisionedThroughputExceeded:节流事件ReadProvisionedThroughputExceeded:消费者节流
5. 实现适当的分区键策略
import hashlib
def get_partition_key(user_id, shard_count=10):
"""生成具有均匀分布的分区键"""
# 使用一致性哈希实现均匀分布
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
实际应用示例
以下是订单处理微服务架构的完整示例:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""创建订单并发布事件"""
order_id = self.generate_order_id()
# 发布订单创建事件
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""将事件发布到 Kinesis 流"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""消费订单事件并更新库存"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# 更新库存数据库
for item in order_data['items']:
# 实现细节
pass
从单体应用迁移到微服务的策略
第一阶段:绞杀者模式
首先通过 Kinesis 路由特定事件,同时保留单体应用:
- 识别单体应用中的有界上下文
- 为跨上下文事件创建 Kinesis 流
- 逐步提取消费这些流的服务
- 与单体应用保持向后兼容性
第二阶段:并行处理
同时运行旧系统和新系统:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""同时写入旧系统和事件流"""
try:
# 首先写入新系统
publish_to_kinesis(kinesis_stream, data)
# 然后更新旧系统
legacy_db.update(data)
except Exception as e:
# 实现补偿逻辑
rollback_kinesis_event(kinesis_stream, data)
raise
第三阶段:全面迁移
一旦建立信心,将所有流量路由到事件驱动架构。
成本优化策略
1. 对于可变工作负载使用按需模式
按需模式(2023 年引入)根据流量自动扩展:
# 创建使用按需模式的流
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. 实施数据聚合
通过批量记录减少 PUT 负载单位:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""聚合记录以减少成本"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# 发送聚合记录
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. 优化数据保留
默认保留时间为 24 小时。只有在必要时才延长:
# 设置保留时间为 7 天
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
安全最佳实践
1. 静态和传输中的加密
# 创建加密流
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# 启用加密
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. 最小权限的 IAM 策略
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC 端点
保持流量在 AWS 网络内。对于管理 AWS 基础设施的代码,请考虑使用 Terraform - 请参阅我们的 Terraform 快速参考:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
可观测性和调试
使用 X-Ray 的分布式追踪
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights 查询
-- 查找处理时间慢的记录
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- 跟踪错误率
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
高级模式
分布式事务的 Saga 模式
跨微服务实现长期事务:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""执行 Saga 并补偿逻辑"""
try:
# 步骤 1:预留库存
self.publish_command('RESERVE_INVENTORY', order_data)
# 步骤 2:处理支付
self.publish_command('PROCESS_PAYMENT', order_data)
# 步骤 3:发货
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# 反向顺序补偿
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""执行补偿事务"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
测试策略
使用 LocalStack 进行本地开发
# 启动带有 Kinesis 的 LocalStack
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# 创建测试流
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
集成测试
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""测试事件发布(使用模拟的 Kinesis)"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
性能调优
优化批量大小
def optimize_batch_processing(records, batch_size=500):
"""以优化的批量处理记录"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
使用连接池
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
有用的链接
AWS Kinesis 资源:
- AWS Kinesis 文档
- AWS Kinesis 数据流开发人员指南
- Kinesis 客户端库(KCL)
- AWS Kinesis 定价计算器
- Kinesis 数据流配额和限制
- AWS 架构博客 - 事件驱动架构
- AWS 示例 - Kinesis 示例
相关文章:
结论
AWS Kinesis 为构建可扩展的事件驱动微服务架构提供了坚实的基础。通过遵循这些模式和最佳实践,您可以创建出具有弹性和可维护性的系统。从小规模开始,使用单个事件流验证您的架构,然后随着系统的发展逐步扩展到更复杂的模式。
成功的关键在于理解您的数据流需求,为您的使用场景选择合适的 Kinesis 服务,并从一开始就实施适当的监控和错误处理。