Membangun Mikroservis Berbasis Acara dengan AWS Kinesis

Arsitektur berbasis acara dengan AWS Kinesis untuk skalabilitas

Konten Halaman

AWS Kinesis telah menjadi fondasi utama dalam membangun arsitektur mikroservis berbasis acara modern, memungkinkan pemrosesan data real-time dalam skala besar dengan minimal beban operasional.

amazon-kinesis

Memahami Arsitektur Mikroservis Berbasis Acara

Arsitektur berbasis acara (EDA) adalah pola desain di mana layanan berkomunikasi melalui acara daripada panggilan sinkron langsung. Pendekatan ini menawarkan beberapa keuntungan:

  • Keterkopelan longgar: Layanan tidak perlu mengetahui keberadaan layanan lain
  • Skalabilitas: Setiap layanan dapat berskala secara independen berdasarkan beban kerjanya
  • Resiliensi: Kegagalan di satu layanan tidak akan berdampak pada layanan lain
  • Fleksibilitas: Layanan baru dapat ditambahkan tanpa memodifikasi layanan yang sudah ada

AWS Kinesis menyediakan fondasi untuk menerapkan EDA dengan bertindak sebagai aliran acara terdistribusi dan tahan lama yang memisahkan produsen dari konsumen.

Ringkasan AWS Kinesis

AWS menyediakan beberapa layanan Kinesis, masing-masing dirancang untuk kasus penggunaan tertentu. Saat mengevaluasi solusi streaming, Anda mungkin juga ingin mempertimbangkan membandingkan RabbitMQ di EKS vs SQS untuk pola pesan dan implikasi biaya yang berbeda.

Kinesis Data Streams

Layanan streaming inti yang menangkap, menyimpan, dan memproses catatan data secara real-time. Data Streams sangat ideal untuk:

  • Aplikasi pemrosesan real-time kustom
  • Membangun pipeline data dengan latensi sub-detik
  • Memproses jutaan acara per detik
  • Menerapkan pola event sourcing

Kinesis Data Firehose

Layanan terkelola penuh yang mengirimkan data streaming ke tujuan seperti S3, Redshift, Elasticsearch, atau endpoint HTTP. Terbaik untuk:

  • Pipeline ETL sederhana
  • Agregasi dan arsip log
  • Analisis hampir real-time (latensi minimum 60 detik)
  • Situasi di mana Anda tidak membutuhkan logika pemrosesan kustom

Kinesis Data Analytics

Memproses dan menganalisis data streaming menggunakan SQL atau Apache Flink. Kasus penggunaan termasuk:

  • Dashboard real-time
  • ETL streaming
  • Deteksi anomali real-time
  • Generasi metrik kontinu

Pola Arsitektur dengan Kinesis

1. Pola Event Sourcing

Event sourcing menyimpan semua perubahan terhadap keadaan aplikasi sebagai urutan acara. Kinesis sangat cocok untuk ini. Jika Anda membutuhkan refresher tentang dasar-dasar Python, lihat Python Cheatsheet kami:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publish an event to Kinesis stream"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }
    
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )
    
    return response['SequenceNumber']

# Contoh: Acara pendaftaran pengguna
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Pisahkan operasi baca dan tulis menggunakan Kinesis sebagai bus acara:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }
    
    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })
    
    return err
}

3. Pola Fan-Out dengan Lambda

Proses acara dari satu aliran dengan beberapa fungsi Lambda. Untuk implementasi TypeScript dengan keamanan tipe yang lebih kuat, lihat TypeScript Cheatsheet kami:

// Konsumen Lambda untuk notifikasi email
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    }
};

// Lambda lain untuk pembaruan inventaris
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );
        
        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    }
};

Praktik Terbaik untuk Produksi

1. Memilih Jumlah Shard yang Tepat

Hitung kebutuhan shard Anda berdasarkan:

  • Masuk: 1 MB/detik atau 1.000 catatan/detik per shard
  • Keluar: 2 MB/detik per shard (konsumen standar) atau 2 MB/detik per konsumen dengan fan-out ditingkatkan
def calculate_shards(records_per_second, avg_record_size_kb):
    """Calculate required number of shards"""
    # Kapasitas masuk
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )
    
    return int(ingress_shards) + 1  # Tambahkan buffer

2. Implementasikan Penanganan Kesalahan yang Tepat

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key, 
                          max_retries=3):
    """Put record with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Backoff eksponensial
                    continue
            raise

3. Gunakan Fan-Out Ditingkatkan untuk Konsumen Banyak

Fan-out ditingkatkan menyediakan throughput khusus untuk setiap konsumen:

# Daftarkan konsumen dengan fan-out ditingkatkan
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. Pantau Metrik Penting

Metrik CloudWatch penting yang perlu dipantau:

  • IncomingRecords: Jumlah catatan yang berhasil dimasukkan
  • IncomingBytes: Volume byte data yang masuk
  • GetRecords.IteratorAgeMilliseconds: Seberapa jauh konsumen tertinggal
  • WriteProvisionedThroughputExceeded: Kejadian throttling
  • ReadProvisionedThroughputExceeded: Throttling konsumen

5. Implementasikan Strategi Key Partition yang Tepat

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generate partition key with even distribution"""
    # Gunakan hashing konsisten untuk distribusi yang merata
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

Contoh Implementasi Nyata

Berikut adalah contoh lengkap dari arsitektur mikroservis pemrosesan pesanan:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name
    
    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Buat pesanan dan publikasikan acara"""
        order_id = self.generate_order_id()
        
        # Publikasikan acara pesanan dibuat
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)
        
        return order_id
    
    def publish_event(self, event_type: str, payload: Dict, 
                     partition_key: str):
        """Publikasikan acara ke aliran Kinesis"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }
        
        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Konsumsi acara pesanan dan perbarui inventaris"""
    
    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])
            
            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])
    
    def reserve_inventory(self, order_data):
        # Perbarui database inventaris
        for item in order_data['items']:
            # Implementasi di sini
            pass

Strategi Migrasi dari Monolit ke Mikroservis

Fase 1: Pola Strangler Fig

Mulailah dengan mengarahkan acara tertentu melalui Kinesis sambil mempertahankan monolit:

  1. Identifikasi konteks terbatas dalam monolit Anda
  2. Buat aliran Kinesis untuk acara lintas konteks
  3. Ekstrak secara bertahap layanan yang mengonsumsi dari aliran ini
  4. Pertahankan kompatibilitas mundur dengan monolit

Fase 2: Pemrosesan Paralel

Jalankan sistem lama dan baru secara paralel:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """Tulis ke sistem lama dan aliran acara"""
    try:
        # Tulis ke sistem baru terlebih dahulu
        publish_to_kinesis(kinesis_stream, data)
        
        # Lalu perbarui sistem lama
        legacy_db.update(data)
    except Exception as e:
        # Implementasikan logika kompensasi
        rollback_kinesis_event(kinesis_stream, data)
        raise

Fase 3: Migrasi Penuh

Setelah kepercayaan terbangun, arahkan semua lalu lintas melalui arsitektur berbasis acara.

Strategi Optimisasi Biaya

1. Gunakan Mode On-Demand untuk Beban Kerja Berubah

Mode on-demand (diperkenalkan pada 2023) secara otomatis berskala berdasarkan lalu lintas:

# Buat aliran dengan mode on-demand
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. Implementasikan Agregasi Data

Kurangi unit PUT dengan menggabungkan catatan:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """Agregasikan catatan untuk mengurangi biaya"""
    aggregator = RecordAggregator()
    
    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )
    
    # Kirimkan catatan agregasi
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. Optimalkan Retensi Data

Retensi default adalah 24 jam. Hanya tingkatkan jika diperlukan:

# Setel retensi menjadi 7 hari
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

Praktik Keamanan Terbaik

1. Enkripsi di Tempat dan dalam Transit

# Buat aliran terenkripsi
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# Aktifkan enkripsi
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. Kebijakan IAM untuk Prinsip Minimal

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. Titik Akhir VPC

Jaga lalu lintas tetap dalam jaringan AWS. Untuk mengelola infrastruktur AWS sebagai kode, pertimbangkan menggunakan Terraform - lihat Terraform cheatsheet kami:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

Observabilitas dan Debugging

Pelacakan Terdistribusi dengan X-Ray

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])
    
    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

Query Insights CloudWatch Logs

-- Cari waktu pemrosesan yang lambat
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- Lacak tingkat kesalahan
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

Pola Lanjutan

Pola Saga untuk Transaksi Terdistribusi

Implementasikan transaksi jangka panjang lintas mikroservis:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())
    
    def execute(self, order_data):
        """Eksekusi saga dengan logika kompensasi"""
        try:
            # Langkah 1: Cadangkan inventaris
            self.publish_command('RESERVE_INVENTORY', order_data)
            
            # Langkah 2: Proses pembayaran
            self.publish_command('PROCESS_PAYMENT', order_data)
            
            # Langkah 3: Kirim pesanan
            self.publish_command('SHIP_ORDER', order_data)
            
        except SagaException as e:
            # Kompensasi dalam urutan terbalik
            self.compensate(e.failed_step)
    
    def compensate(self, failed_step):
        """Eksekusi transaksi kompensasi"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }
        
        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

Strategi Pengujian

Pengembangan Lokal dengan LocalStack

# Mulai LocalStack dengan Kinesis
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# Buat aliran pengujian
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

Pengujian Integrasi

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """Uji penerbitan acara dengan Kinesis yang dimodifikasi"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)
    
    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])
    
    assert order_id is not None

Penyempurnaan Kinerja

Optimalkan Ukuran Batch

def optimize_batch_processing(records, batch_size=500):
    """Proses catatan dalam batch yang dioptimalkan"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

Gunakan Pooling Koneksi

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

Tautan Berguna

Sumber Daya AWS Kinesis:

Artikel Terkait:

Kesimpulan

AWS Kinesis menyediakan fondasi yang kuat untuk membangun arsitektur mikroservis berbasis acara yang skalabel. Dengan mengikuti pola dan praktik terbaik ini, Anda dapat menciptakan sistem yang tangguh, skalabel, dan dapat dipelihara. Mulailah dengan satu aliran acara, validasi arsitektur Anda, dan secara bertahap perluas ke pola yang lebih kompleks seiring pertumbuhan sistem Anda.

Kunci keberhasilan adalah memahami kebutuhan aliran data Anda, memilih layanan Kinesis yang tepat untuk kasus penggunaan Anda, dan menerapkan pemantauan dan penanganan kesalahan yang tepat sejak awal.