Создание микросервисов на основе событий с использованием AWS Kinesis

Архитектура на основе событий с использованием AWS Kinesis для масштабируемости

Содержимое страницы

AWS Kinesis стал основой для создания современных микросервисных архитектур, работающих на основе событий, обеспечивая обработку данных в реальном времени с минимальными операционными затратами.

amazon-kinesis

Понимание микросервисных архитектур на основе событий

Архитектура на основе событий (EDA) - это шаблон проектирования, при котором сервисы обмениваются событиями, а не используют прямые синхронные вызовы. Этот подход предлагает несколько преимуществ:

  • Слабая связанность: Сервисы не должны знать о существовании друг друга
  • Масштабируемость: Каждый сервис масштабируется независимо в зависимости от нагрузки
  • Устойчивость: Сбои в одном сервисе не распространяются на другие
  • Гибкость: Новые сервисы могут быть добавлены без изменения существующих

AWS Kinesis предоставляет основу для реализации EDA, выступая в роли распределенного, надежного потока событий, который развязывает производителей и потребителей.

Обзор AWS Kinesis

AWS предлагает несколько сервисов Kinesis, каждый из которых предназначен для конкретных сценариев использования. При оценке решений для потоковой передачи данных вы также можете захотеть сравнить RabbitMQ на EKS и SQS для различных шаблонов обмена сообщениями и последствий для стоимости.

Kinesis Data Streams

Основной сервис потоковой передачи данных, который захватывает, хранит и обрабатывает записи данных в реальном времени. Data Streams идеален для:

  • Пользовательских приложений для обработки данных в реальном времени
  • Создания конвейеров данных с задержкой менее секунды
  • Обработки миллионов событий в секунду
  • Реализации шаблонов событийного источника

Kinesis Data Firehose

Полностью управляемый сервис, который доставляет потоковые данные в такие места назначения, как S3, Redshift, Elasticsearch или HTTP-концы. Лучше всего подходит для:

  • Простых ETL-конвейеров
  • Агрегации и архивирования логов
  • Аналитики в почти реальном времени (минимальная задержка 60 секунд)
  • Сценариев, где не требуется пользовательская логика обработки

Kinesis Data Analytics

Обрабатывает и анализирует потоковые данные с использованием SQL или Apache Flink. Сценарии использования включают:

  • Панелей управления в реальном времени
  • Потоковой ETL
  • Обнаружения аномалий в реальном времени
  • Непрерывного генерации метрик

Архитектурные шаблоны с Kinesis

1. Шаблон событийного источника

Событийный источник хранит все изменения состояния приложения как последовательность событий. Kinesis идеально подходит для этого. Если вам нужна повторная проверка основ Python, ознакомьтесь с нашим Шпаргалкой по Python:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Опубликовать событие в потоке Kinesis"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }

    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )

    return response['SequenceNumber']

# Пример: событие регистрации пользователя
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Разделение ответственности команд и запросов)

Разделяйте операции чтения и записи, используя Kinesis в качестве шины событий:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }

    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })

    return err
}

3. Шаблон веерного распространения с Lambda

Обрабатывайте события из одного потока с несколькими функциями Lambda. Для реализаций на TypeScript с более строгой проверкой типов обратитесь к нашей Шпаргалке по TypeScript:

// Lambda-потребитель для уведомлений по электронной почте
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );

        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Другая Lambda для обновлений инвентаря
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );

        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Лучшие практики для продакшена

1. Выбор правильного количества шардов

Рассчитывайте требования к шардам на основе:

  • Входящий трафик: 1 МБ/сек или 1,000 записей/сек на шард
  • Исходящий трафик: 2 МБ/сек на шард (стандартные потребители) или 2 МБ/сек на потребителя с улучшенным веерным распространением
def calculate_shards(records_per_second, avg_record_size_kb):
    """Рассчитать необходимое количество шардов"""
    # Входящая пропускная способность
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )

    return int(ingress_shards) + 1  # Добавить буфер

2. Реализация правильной обработки ошибок

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
                          max_retries=3):
    """Отправить запись с повторными попытками с экспоненциальным затуханием"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Экспоненциальное затухание
                    continue
            raise

3. Используйте улучшенное веерное распространение для нескольких потребителей

Улучшенное веерное распространение предоставляет выделенную пропускную способность для каждого потребителя:

# Зарегистрировать потребителя с улучшенным веерным распространением
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Мониторинг ключевых метрик

Основные метрики CloudWatch для отслеживания:

  • IncomingRecords: Количество успешно отправленных записей
  • IncomingBytes: Объем входящих данных в байтах
  • GetRecords.IteratorAgeMilliseconds: Насколько потребители отстают
  • WriteProvisionedThroughputExceeded: События дросселирования
  • ReadProvisionedThroughputExceeded: Дросселирование потребителей

5. Реализация правильной стратегии ключа разделения

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Сгенерировать ключ разделения с равномерным распределением"""
    # Использовать согласованное хеширование для равномерного распределения
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Пример реальной реализации

Вот полный пример микросервисной архитектуры обработки заказов:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name

    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Создать заказ и опубликовать события"""
        order_id = self.generate_order_id()

        # Опубликовать событие создания заказа
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)

        return order_id

    def publish_event(self, event_type: str, payload: Dict,
                     partition_key: str):
        """Опубликовать событие в потоке Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }

        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Потребляет события заказов и обновляет инвентарь"""

    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])

            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])

    def reserve_inventory(self, order_data):
        # Обновить базу данных инвентаря
        for item in order_data['items']:
            # Реализация здесь
            pass

Стратегия миграции от монолита к микросервисам

Фаза 1: Паттерн “Странгулятор”

Начните с маршрутизации определенных событий через Kinesis, сохраняя монолит:

  1. Определите ограниченные контексты в вашем монолите
  2. Создайте потоки Kinesis для межконтекстных событий
  3. Постепенно извлекайте сервисы, которые потребляют из этих потоков
  4. Поддерживайте обратную совместимость с монолитом

Фаза 2: Параллельная обработка

Запускайте старую и новую системы параллельно:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Запись как в старую систему, так и в поток событий"""
    try:
        # Сначала запись в новую систему
        publish_to_kinesis(kinesis_stream, data)

        # Затем обновление старой системы
        legacy_db.update(data)
    except Exception as e:
        # Реализация компенсирующей логики
        rollback_kinesis_event(kinesis_stream, data)
        raise

Фаза 3: Полная миграция

После установления уверенности, маршрутизируйте весь трафик через архитектуру, основанную на событиях.

Стратегии оптимизации затрат

1. Использование режима On-Demand для переменных нагрузок

Режим on-demand (введен в 2023 году) автоматически масштабируется в зависимости от трафика:

# Создание потока в режиме on-demand
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Реализация агрегации данных

Снизьте количество единиц PUT за счет пакетирования записей:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Агрегация записей для снижения затрат"""
    aggregator = RecordAggregator()

    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )

    # Отправка агрегированной записи
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Оптимизация периода хранения

По умолчанию период хранения составляет 24 часа. Увеличивайте его только при необходимости:

# Установка периода хранения на 7 дней
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Лучшие практики безопасности

1. Шифрование в состоянии покоя и при передаче

# Создание зашифрованного потока
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Включение шифрования
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Политики IAM для минимальных привилегий

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC Endpoints

Поддерживайте трафик внутри сети AWS. Для управления инфраструктурой AWS как кодом рассмотрите использование Terraform - см. наш Terraform cheatsheet:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Наблюдаемость и отладка

Распределенное трассирование с X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])

    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Запросы CloudWatch Logs Insights

-- Поиск медленных времен обработки
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Отслеживание уровня ошибок
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Продвинутые паттерны

Паттерн Saga для распределенных транзакций

Реализуйте длительные транзакции между микросервисами:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())

    def execute(self, order_data):
        """Выполнение saga с компенсирующей логикой"""
        try:
            # Шаг 1: Резервирование инвентаря
            self.publish_command('RESERVE_INVENTORY', order_data)

            # Шаг 2: Обработка платежа
            self.publish_command('PROCESS_PAYMENT', order_data)

            # Шаг 3: Отправка заказа
            self.publish_command('SHIP_ORDER', order_data)

        except SagaException as e:
            # Компенсация в обратном порядке
            self.compensate(e.failed_step)

    def compensate(self, failed_step):
        """Выполнение компенсирующих транзакций"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }

        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Стратегии тестирования

Локальная разработка с LocalStack

# Запуск LocalStack с Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Создание тестового потока
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Интеграционное тестирование

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Тестирование публикации событий с имитацией Kinesis"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)

    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])

    assert order_id is not None

Настройка производительности

Оптимизация размера пакета

def optimize_batch_processing(records, batch_size=500):
    """Обработка записей в оптимизированных пакетах"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Использование пула соединений

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Полезные ссылки

Ресурсы AWS Kinesis:

Связанные статьи:

Заключение

AWS Kinesis предоставляет надежную основу для создания масштабируемых архитектур микросервисов на основе событий. Следуя этим паттернам и лучшим практикам, вы можете создать системы, которые являются устойчивыми, масштабируемыми и поддерживаемыми. Начните с одного потока событий, проверьте свою архитектуру и постепенно переходите к более сложным паттернам по мере роста системы.

Ключ к успеху - понимание требований к потоку данных, выбор правильного сервиса Kinesis для вашего случая использования и реализация правильного мониторинга и обработки ошибок с самого начала.