Membangun Mikroservis Berbasis Acara dengan AWS Kinesis
Arsitektur berbasis acara dengan AWS Kinesis untuk skalabilitas
AWS Kinesis telah menjadi fondasi utama dalam membangun arsitektur mikroservis berbasis acara modern, memungkinkan pemrosesan data real-time dalam skala besar dengan minimal beban operasional.

Memahami Arsitektur Mikroservis Berbasis Acara
Arsitektur berbasis acara (EDA) adalah pola desain di mana layanan berkomunikasi melalui acara daripada panggilan sinkron langsung. Pendekatan ini menawarkan beberapa keuntungan:
- Keterkopelan longgar: Layanan tidak perlu mengetahui keberadaan layanan lain
- Skalabilitas: Setiap layanan dapat berskala secara independen berdasarkan beban kerjanya
- Resiliensi: Kegagalan di satu layanan tidak akan berdampak pada layanan lain
- Fleksibilitas: Layanan baru dapat ditambahkan tanpa memodifikasi layanan yang sudah ada
AWS Kinesis menyediakan fondasi untuk menerapkan EDA dengan bertindak sebagai aliran acara terdistribusi dan tahan lama yang memisahkan produsen dari konsumen.
Ringkasan AWS Kinesis
AWS menyediakan beberapa layanan Kinesis, masing-masing dirancang untuk kasus penggunaan tertentu. Saat mengevaluasi solusi streaming, Anda mungkin juga ingin mempertimbangkan membandingkan RabbitMQ di EKS vs SQS untuk pola pesan dan implikasi biaya yang berbeda.
Kinesis Data Streams
Layanan streaming inti yang menangkap, menyimpan, dan memproses catatan data secara real-time. Data Streams sangat ideal untuk:
- Aplikasi pemrosesan real-time kustom
- Membangun pipeline data dengan latensi sub-detik
- Memproses jutaan acara per detik
- Menerapkan pola event sourcing
Kinesis Data Firehose
Layanan terkelola penuh yang mengirimkan data streaming ke tujuan seperti S3, Redshift, Elasticsearch, atau endpoint HTTP. Terbaik untuk:
- Pipeline ETL sederhana
- Agregasi dan arsip log
- Analisis hampir real-time (latensi minimum 60 detik)
- Situasi di mana Anda tidak membutuhkan logika pemrosesan kustom
Kinesis Data Analytics
Memproses dan menganalisis data streaming menggunakan SQL atau Apache Flink. Kasus penggunaan termasuk:
- Dashboard real-time
- ETL streaming
- Deteksi anomali real-time
- Generasi metrik kontinu
Pola Arsitektur dengan Kinesis
1. Pola Event Sourcing
Event sourcing menyimpan semua perubahan terhadap keadaan aplikasi sebagai urutan acara. Kinesis sangat cocok untuk ini. Jika Anda membutuhkan refresher tentang dasar-dasar Python, lihat Python Cheatsheet kami:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Publish an event to Kinesis stream"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Contoh: Acara pendaftaran pengguna
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Pisahkan operasi baca dan tulis menggunakan Kinesis sebagai bus acara:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Pola Fan-Out dengan Lambda
Proses acara dari satu aliran dengan beberapa fungsi Lambda. Untuk implementasi TypeScript dengan keamanan tipe yang lebih kuat, lihat TypeScript Cheatsheet kami:
// Konsumen Lambda untuk notifikasi email
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// Lambda lain untuk pembaruan inventaris
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
Praktik Terbaik untuk Produksi
1. Memilih Jumlah Shard yang Tepat
Hitung kebutuhan shard Anda berdasarkan:
- Masuk: 1 MB/detik atau 1.000 catatan/detik per shard
- Keluar: 2 MB/detik per shard (konsumen standar) atau 2 MB/detik per konsumen dengan fan-out ditingkatkan
def calculate_shards(records_per_second, avg_record_size_kb):
"""Calculate required number of shards"""
# Kapasitas masuk
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Tambahkan buffer
2. Implementasikan Penanganan Kesalahan yang Tepat
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Put record with exponential backoff retry"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Backoff eksponensial
continue
raise
3. Gunakan Fan-Out Ditingkatkan untuk Konsumen Banyak
Fan-out ditingkatkan menyediakan throughput khusus untuk setiap konsumen:
# Daftarkan konsumen dengan fan-out ditingkatkan
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. Pantau Metrik Penting
Metrik CloudWatch penting yang perlu dipantau:
IncomingRecords: Jumlah catatan yang berhasil dimasukkanIncomingBytes: Volume byte data yang masukGetRecords.IteratorAgeMilliseconds: Seberapa jauh konsumen tertinggalWriteProvisionedThroughputExceeded: Kejadian throttlingReadProvisionedThroughputExceeded: Throttling konsumen
5. Implementasikan Strategi Key Partition yang Tepat
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generate partition key with even distribution"""
# Gunakan hashing konsisten untuk distribusi yang merata
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
Contoh Implementasi Nyata
Berikut adalah contoh lengkap dari arsitektur mikroservis pemrosesan pesanan:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Buat pesanan dan publikasikan acara"""
order_id = self.generate_order_id()
# Publikasikan acara pesanan dibuat
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Publikasikan acara ke aliran Kinesis"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Konsumsi acara pesanan dan perbarui inventaris"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Perbarui database inventaris
for item in order_data['items']:
# Implementasi di sini
pass
Strategi Migrasi dari Monolit ke Mikroservis
Fase 1: Pola Strangler Fig
Mulailah dengan mengarahkan acara tertentu melalui Kinesis sambil mempertahankan monolit:
- Identifikasi konteks terbatas dalam monolit Anda
- Buat aliran Kinesis untuk acara lintas konteks
- Ekstrak secara bertahap layanan yang mengonsumsi dari aliran ini
- Pertahankan kompatibilitas mundur dengan monolit
Fase 2: Pemrosesan Paralel
Jalankan sistem lama dan baru secara paralel:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""Tulis ke sistem lama dan aliran acara"""
try:
# Tulis ke sistem baru terlebih dahulu
publish_to_kinesis(kinesis_stream, data)
# Lalu perbarui sistem lama
legacy_db.update(data)
except Exception as e:
# Implementasikan logika kompensasi
rollback_kinesis_event(kinesis_stream, data)
raise
Fase 3: Migrasi Penuh
Setelah kepercayaan terbangun, arahkan semua lalu lintas melalui arsitektur berbasis acara.
Strategi Optimisasi Biaya
1. Gunakan Mode On-Demand untuk Beban Kerja Berubah
Mode on-demand (diperkenalkan pada 2023) secara otomatis berskala berdasarkan lalu lintas:
# Buat aliran dengan mode on-demand
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. Implementasikan Agregasi Data
Kurangi unit PUT dengan menggabungkan catatan:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""Agregasikan catatan untuk mengurangi biaya"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# Kirimkan catatan agregasi
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. Optimalkan Retensi Data
Retensi default adalah 24 jam. Hanya tingkatkan jika diperlukan:
# Setel retensi menjadi 7 hari
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
Praktik Keamanan Terbaik
1. Enkripsi di Tempat dan dalam Transit
# Buat aliran terenkripsi
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# Aktifkan enkripsi
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. Kebijakan IAM untuk Prinsip Minimal
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. Titik Akhir VPC
Jaga lalu lintas tetap dalam jaringan AWS. Untuk mengelola infrastruktur AWS sebagai kode, pertimbangkan menggunakan Terraform - lihat Terraform cheatsheet kami:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
Observabilitas dan Debugging
Pelacakan Terdistribusi dengan X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
Query Insights CloudWatch Logs
-- Cari waktu pemrosesan yang lambat
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- Lacak tingkat kesalahan
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
Pola Lanjutan
Pola Saga untuk Transaksi Terdistribusi
Implementasikan transaksi jangka panjang lintas mikroservis:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""Eksekusi saga dengan logika kompensasi"""
try:
# Langkah 1: Cadangkan inventaris
self.publish_command('RESERVE_INVENTORY', order_data)
# Langkah 2: Proses pembayaran
self.publish_command('PROCESS_PAYMENT', order_data)
# Langkah 3: Kirim pesanan
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# Kompensasi dalam urutan terbalik
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""Eksekusi transaksi kompensasi"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
Strategi Pengujian
Pengembangan Lokal dengan LocalStack
# Mulai LocalStack dengan Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# Buat aliran pengujian
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
Pengujian Integrasi
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""Uji penerbitan acara dengan Kinesis yang dimodifikasi"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
Penyempurnaan Kinerja
Optimalkan Ukuran Batch
def optimize_batch_processing(records, batch_size=500):
"""Proses catatan dalam batch yang dioptimalkan"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
Gunakan Pooling Koneksi
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
Tautan Berguna
Sumber Daya AWS Kinesis:
- Dokumentasi AWS Kinesis
- Panduan Pengembang AWS Kinesis Data Streams
- Kinesis Client Library (KCL)
- Kalkulator Harga AWS Kinesis
- Kuota dan Batas Kinesis Data Streams
- Blog Arsitektur AWS - Arsitektur Berbasis Acara
- Contoh Kinesis AWS
Artikel Terkait:
- Perbandingan Biaya Hosting Rabbitmq di Eks vs Sqs
- TypeScript Cheatsheet: Konsep Inti & Praktik Terbaik
- Python Cheatsheet
- Terraform Cheatsheet - Perintah Berguna dan Contoh
Kesimpulan
AWS Kinesis menyediakan fondasi yang kuat untuk membangun arsitektur mikroservis berbasis acara yang skalabel. Dengan mengikuti pola dan praktik terbaik ini, Anda dapat menciptakan sistem yang tangguh, skalabel, dan dapat dipelihara. Mulailah dengan satu aliran acara, validasi arsitektur Anda, dan secara bertahap perluas ke pola yang lebih kompleks seiring pertumbuhan sistem Anda.
Kunci keberhasilan adalah memahami kebutuhan aliran data Anda, memilih layanan Kinesis yang tepat untuk kasus penggunaan Anda, dan menerapkan pemantauan dan penanganan kesalahan yang tepat sejak awal.