Создание микросервисов на основе событий с использованием AWS Kinesis
Архитектура на основе событий с использованием AWS Kinesis для масштабируемости
AWS Kinesis стал основой для создания современных микросервисных архитектур, работающих на основе событий, обеспечивая обработку данных в реальном времени с минимальными операционными затратами.

Понимание микросервисных архитектур на основе событий
Архитектура на основе событий (EDA) - это шаблон проектирования, при котором сервисы обмениваются событиями, а не используют прямые синхронные вызовы. Этот подход предлагает несколько преимуществ:
- Слабая связанность: Сервисы не должны знать о существовании друг друга
- Масштабируемость: Каждый сервис масштабируется независимо в зависимости от нагрузки
- Устойчивость: Сбои в одном сервисе не распространяются на другие
- Гибкость: Новые сервисы могут быть добавлены без изменения существующих
AWS Kinesis предоставляет основу для реализации EDA, выступая в роли распределенного, надежного потока событий, который развязывает производителей и потребителей.
Обзор AWS Kinesis
AWS предлагает несколько сервисов Kinesis, каждый из которых предназначен для конкретных сценариев использования. При оценке решений для потоковой передачи данных вы также можете захотеть сравнить RabbitMQ на EKS и SQS для различных шаблонов обмена сообщениями и последствий для стоимости.
Kinesis Data Streams
Основной сервис потоковой передачи данных, который захватывает, хранит и обрабатывает записи данных в реальном времени. Data Streams идеален для:
- Пользовательских приложений для обработки данных в реальном времени
- Создания конвейеров данных с задержкой менее секунды
- Обработки миллионов событий в секунду
- Реализации шаблонов событийного источника
Kinesis Data Firehose
Полностью управляемый сервис, который доставляет потоковые данные в такие места назначения, как S3, Redshift, Elasticsearch или HTTP-концы. Лучше всего подходит для:
- Простых ETL-конвейеров
- Агрегации и архивирования логов
- Аналитики в почти реальном времени (минимальная задержка 60 секунд)
- Сценариев, где не требуется пользовательская логика обработки
Kinesis Data Analytics
Обрабатывает и анализирует потоковые данные с использованием SQL или Apache Flink. Сценарии использования включают:
- Панелей управления в реальном времени
- Потоковой ETL
- Обнаружения аномалий в реальном времени
- Непрерывного генерации метрик
Архитектурные шаблоны с Kinesis
1. Шаблон событийного источника
Событийный источник хранит все изменения состояния приложения как последовательность событий. Kinesis идеально подходит для этого. Если вам нужна повторная проверка основ Python, ознакомьтесь с нашим Шпаргалкой по Python:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Опубликовать событие в потоке Kinesis"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Пример: событие регистрации пользователя
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Разделение ответственности команд и запросов)
Разделяйте операции чтения и записи, используя Kinesis в качестве шины событий:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Шаблон веерного распространения с Lambda
Обрабатывайте события из одного потока с несколькими функциями Lambda. Для реализаций на TypeScript с более строгой проверкой типов обратитесь к нашей Шпаргалке по TypeScript:
// Lambda-потребитель для уведомлений по электронной почте
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Другая Lambda для обновлений инвентаря
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Лучшие практики для продакшена
1. Выбор правильного количества шардов
Рассчитывайте требования к шардам на основе:
- Входящий трафик: 1 МБ/сек или 1,000 записей/сек на шард
- Исходящий трафик: 2 МБ/сек на шард (стандартные потребители) или 2 МБ/сек на потребителя с улучшенным веерным распространением
def calculate_shards(records_per_second, avg_record_size_kb):
"""Рассчитать необходимое количество шардов"""
# Входящая пропускная способность
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Добавить буфер
2. Реализация правильной обработки ошибок
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Отправить запись с повторными попытками с экспоненциальным затуханием"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Экспоненциальное затухание
continue
raise
3. Используйте улучшенное веерное распространение для нескольких потребителей
Улучшенное веерное распространение предоставляет выделенную пропускную способность для каждого потребителя:
# Зарегистрировать потребителя с улучшенным веерным распространением
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Мониторинг ключевых метрик
Основные метрики CloudWatch для отслеживания:
IncomingRecords: Количество успешно отправленных записейIncomingBytes: Объем входящих данных в байтахGetRecords.IteratorAgeMilliseconds: Насколько потребители отстаютWriteProvisionedThroughputExceeded: События дросселированияReadProvisionedThroughputExceeded: Дросселирование потребителей
5. Реализация правильной стратегии ключа разделения
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Сгенерировать ключ разделения с равномерным распределением"""
# Использовать согласованное хеширование для равномерного распределения
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Пример реальной реализации
Вот полный пример микросервисной архитектуры обработки заказов:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Создать заказ и опубликовать события"""
order_id = self.generate_order_id()
# Опубликовать событие создания заказа
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Опубликовать событие в потоке Kinesis"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Потребляет события заказов и обновляет инвентарь"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Обновить базу данных инвентаря
for item in order_data['items']:
# Реализация здесь
pass
Стратегия миграции от монолита к микросервисам
Фаза 1: Паттерн “Странгулятор”
Начните с маршрутизации определенных событий через Kinesis, сохраняя монолит:
- Определите ограниченные контексты в вашем монолите
- Создайте потоки Kinesis для межконтекстных событий
- Постепенно извлекайте сервисы, которые потребляют из этих потоков
- Поддерживайте обратную совместимость с монолитом
Фаза 2: Параллельная обработка
Запускайте старую и новую системы параллельно:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Запись как в старую систему, так и в поток событий"""
try:
# Сначала запись в новую систему
publish_to_kinesis(kinesis_stream, data)
# Затем обновление старой системы
legacy_db.update(data)
except Exception as e:
# Реализация компенсирующей логики
rollback_kinesis_event(kinesis_stream, data)
raise
Фаза 3: Полная миграция
После установления уверенности, маршрутизируйте весь трафик через архитектуру, основанную на событиях.
Стратегии оптимизации затрат
1. Использование режима On-Demand для переменных нагрузок
Режим on-demand (введен в 2023 году) автоматически масштабируется в зависимости от трафика:
# Создание потока в режиме on-demand
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Реализация агрегации данных
Снизьте количество единиц PUT за счет пакетирования записей:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Агрегация записей для снижения затрат"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Отправка агрегированной записи
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Оптимизация периода хранения
По умолчанию период хранения составляет 24 часа. Увеличивайте его только при необходимости:
# Установка периода хранения на 7 дней
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Лучшие практики безопасности
1. Шифрование в состоянии покоя и при передаче
# Создание зашифрованного потока
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Включение шифрования
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. Политики IAM для минимальных привилегий
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC Endpoints
Поддерживайте трафик внутри сети AWS. Для управления инфраструктурой AWS как кодом рассмотрите использование Terraform - см. наш Terraform cheatsheet:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Наблюдаемость и отладка
Распределенное трассирование с X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
Запросы CloudWatch Logs Insights
-- Поиск медленных времен обработки
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Отслеживание уровня ошибок
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Продвинутые паттерны
Паттерн Saga для распределенных транзакций
Реализуйте длительные транзакции между микросервисами:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Выполнение saga с компенсирующей логикой"""
try:
# Шаг 1: Резервирование инвентаря
self.publish_command('RESERVE_INVENTORY', order_data)
# Шаг 2: Обработка платежа
self.publish_command('PROCESS_PAYMENT', order_data)
# Шаг 3: Отправка заказа
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Компенсация в обратном порядке
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Выполнение компенсирующих транзакций"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Стратегии тестирования
Локальная разработка с LocalStack
# Запуск LocalStack с Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Создание тестового потока
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Интеграционное тестирование
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Тестирование публикации событий с имитацией Kinesis"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Настройка производительности
Оптимизация размера пакета
def optimize_batch_processing(records, batch_size=500):
"""Обработка записей в оптимизированных пакетах"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Использование пула соединений
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Полезные ссылки
Ресурсы AWS Kinesis:
- Документация AWS Kinesis
- Руководство разработчика AWS Kinesis Data Streams
- Kinesis Client Library (KCL)
- Калькулятор стоимости AWS Kinesis
- Квоты и ограничения Kinesis Data Streams
- AWS Architecture Blog - Архитектуры на основе событий
- AWS Samples - Примеры Kinesis
Связанные статьи:
- Сравнение стоимости хостинга Rabbitmq на Eks и Sqs
- TypeScript Cheatsheet: Основные концепции и лучшие практики
- Python Cheatsheet
- Terraform cheatsheet - полезные команды и примеры
Заключение
AWS Kinesis предоставляет надежную основу для создания масштабируемых архитектур микросервисов на основе событий. Следуя этим паттернам и лучшим практикам, вы можете создать системы, которые являются устойчивыми, масштабируемыми и поддерживаемыми. Начните с одного потока событий, проверьте свою архитектуру и постепенно переходите к более сложным паттернам по мере роста системы.
Ключ к успеху - понимание требований к потоку данных, выбор правильного сервиса Kinesis для вашего случая использования и реализация правильного мониторинга и обработки ошибок с самого начала.