AWS Kinesis के साथ इवेंट-ड्राइवन माइक्रोसर्विसेज बनाना
AWS Kinesis के साथ इवेंट-ड्राइवन आर्किटेक्चर के लिए स्केल
AWS Kinesis ने आधुनिक इवेंट-ड्राइवन माइक्रोसर्विसेस आर्किटेक्चर बनाने के लिए एक महत्वपूर्ण आधार बन गया है, जो कम ऑपरेशनल ओवरहेड के साथ रियल-टाइम डेटा प्रोसेसिंग को स्केल करने की अनुमति देता है।

इवेंट-ड्राइवन माइक्रोसर्विसेस आर्किटेक्चर को समझना
इवेंट-ड्राइवन आर्किटेक्चर (EDA) एक डिजाइन पैटर्न है जहां सेवाएं डायरेक्ट सिंक्रोनस कॉल्स के बजाय इवेंट्स के माध्यम से संचार करती हैं। इस दृष्टिकोण के कई फायदे हैं:
- लूज़ कपलिंग: सेवाओं को एक दूसरे के अस्तित्व के बारे में जानने की आवश्यकता नहीं होती
- स्केलेबिलिटी: प्रत्येक सेवा अपने वर्कलोड के आधार पर स्वतंत्र रूप से स्केल होती है
- रिज़िलिएंस: एक सेवा में विफलता अन्य सेवाओं में कास्केड नहीं होती
- फ्लेक्सिबिलिटी: नई सेवाओं को जोड़ा जा सकता है बिना मौजूदा सेवाओं में परिवर्तन किए
AWS Kinesis EDA लागू करने के लिए एक आधार प्रदान करता है, जो एक वितरित, ड्यूरेबल इवेंट स्ट्रीम के रूप में कार्य करता है जो प्रोड्यूसर्स को कन्स्यूमर्स से डिकपल करता है।
AWS Kinesis ओवरव्यू
AWS कई Kinesis सेवाएं प्रदान करता है, प्रत्येक विशिष्ट उपयोग के मामलों के लिए डिज़ाइन की गई हैं। स्ट्रीमिंग समाधानों का मूल्यांकन करते समय, आप RabbitMQ on EKS vs SQS की तुलना भी करना चाह सकते हैं, जो विभिन्न मेसेजिंग पैटर्न्स और लागत प्रभावों के लिए है।
Kinesis Data Streams
कोर स्ट्रीमिंग सेवा जो डेटा रिकॉर्ड्स को रियल-टाइम में कैप्चर, स्टोर, और प्रोसेस करती है। डेटा स्ट्रीम्स आदर्श है:
- कस्टम रियल-टाइम प्रोसेसिंग एप्लिकेशन्स के लिए
- सब-सेकंड लेटेंसी के साथ डेटा पाइपलाइन्स बनाने के लिए
- प्रति सेकंड मिलियन इवेंट्स को प्रोसेस करने के लिए
- इवेंट सोर्सिंग पैटर्न्स लागू करने के लिए
Kinesis Data Firehose
एक पूर्ण रूप से प्रबंधित सेवा जो स्ट्रीमिंग डेटा को S3, Redshift, Elasticsearch, या HTTP एंडपॉइंट्स जैसे डेस्टिनेशन्स तक पहुंचाती है। सबसे अच्छा है:
- सरल ETL पाइपलाइन्स के लिए
- लॉग एग्रीगेशन और आर्काइवल के लिए
- नियर-रियल-टाइम एनालिटिक्स (60-सेकंड मिनिमम लेटेंसी) के लिए
- उन स्थितियों के लिए जहां आपको कस्टम प्रोसेसिंग लॉजिक की आवश्यकता नहीं होती
Kinesis Data Analytics
SQL या Apache Flink का उपयोग करके स्ट्रीमिंग डेटा को प्रोसेस और विश्लेषण करता है। उपयोग के मामले शामिल हैं:
- रियल-टाइम डैशबोर्ड्स
- स्ट्रीमिंग ETL
- रियल-टाइम एनोमली डिटेक्शन
- कंटीन्यूअस मेट्रिक जनरेशन
Kinesis के साथ आर्किटेक्चरल पैटर्न्स
1. इवेंट सोर्सिंग पैटर्न
इवेंट सोर्सिंग एप्लिकेशन स्टेट में सभी परिवर्तनों को एक इवेंट्स की श्रृंखला के रूप में स्टोर करता है। Kinesis इसके लिए आदर्श है। यदि आपको Python फंडामेंटल्स पर एक रिफ्रेशर की आवश्यकता है, तो हमारे Python Cheatsheet देखें:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""Publish an event to Kinesis stream"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# Example: User registration event
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. CQRS (Command Query Responsibility Segregation)
Kinesis को इवेंट बस के रूप में उपयोग करके रीड और राइट ऑपरेशन्स को अलग करें:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. Fan-Out Pattern with Lambda
एक ही स्ट्रीम से इवेंट्स को कई Lambda फंक्शन्स के साथ प्रोसेस करें। टाइपसेफ्टी के साथ मजबूत टाइपसेफ्टी के लिए टाइपस्क्रिप्ट इम्प्लीमेंटेशन्स के लिए, हमारे TypeScript Cheatsheet देखें:
// Lambda consumer for email notifications
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
};
};
// Another Lambda for inventory updates
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
};
};
प्रोडक्शन के लिए बेस्ट प्रैक्टिसेस
1. सही शार्ड काउंट चुनना
अपने शार्ड आवश्यकताओं का गणना करें:
- इनग्रेस: प्रति शार्ड 1 MB/sec या 1,000 रिकॉर्ड्स/sec
- ईग्रेस: प्रति शार्ड 2 MB/sec (स्टैंडर्ड कन्स्यूमर्स) या प्रति कन्स्यूमर 2 MB/sec के साथ एन्हांस्ड फैन-आउट
def calculate_shards(records_per_second, avg_record_size_kb):
"""Calculate required number of shards"""
# Ingress capacity
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # Add buffer
2. उचित एरर हैंडलिंग लागू करें
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""Put record with exponential backoff retry"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
raise
3. कई कन्स्यूमर्स के लिए एन्हांस्ड फैन-आउट का उपयोग करें
एन्हांस्ड फैन-आउट प्रत्येक कन्स्यूमर के लिए डेडिकेटेड थ्रूपुट प्रदान करता है:
# Register a consumer with enhanced fan-out
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. महत्वपूर्ण मेट्रिक्स का निगरानी करें
ट्रैक करने के लिए आवश्यक CloudWatch मेट्रिक्स:
IncomingRecords: सफलतापूर्वक रखे गए रिकॉर्ड्स की संख्याIncomingBytes: इनकमिंग डेटा का बाइट वॉल्यूमGetRecords.IteratorAgeMilliseconds: कन्स्यूमर्स कितने पीछे हैंWriteProvisionedThroughputExceeded: थ्रॉटलिंग इवेंट्सReadProvisionedThroughputExceeded: कन्स्यूमर थ्रॉटलिंग
5. उचित पार्टिशन की स्ट्रैटेजी लागू करें
import hashlib
def get_partition_key(user_id, shard_count=10):
"""Generate partition key with even distribution"""
# Use consistent hashing for even distribution
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
रियल-वर्ल्ड इम्प्लीमेंटेशन उदाहरण
यह एक ऑर्डर प्रोसेसिंग माइक्रोसर्विसेस आर्किटेक्चर का एक पूर्ण उदाहरण है:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""Create order and publish events"""
order_id = self.generate_order_id()
# Publish order created event
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""Publish event to Kinesis stream"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""Consumes order events and updates inventory"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# Update inventory database
for item in order_data['items']:
# Implementation here
pass
मोनोलिथ से माइक्रोसर्विसेस में माइग्रेशन रणनीति
चरण 1: स्ट्रैंगलर फिग पैटर्न
Kinesis के माध्यम से विशिष्ट घटनाओं को रूटिंग करना शुरू करें जबकि मोनोलिथ को बनाए रखें:
- अपने मोनोलिथ में बाउंडेड कॉन्टेक्स्ट्स की पहचान करें
- क्रॉस-कॉन्टेक्स्ट घटनाओं के लिए Kinesis स्ट्रीम्स बनाएं
- इन स्ट्रीम्स से डेटा कन्स्यूम करने वाले सेवाओं को धीरे-धीरे निकालें
- मोनोलिथ के साथ पिछले संगतता बनाए रखें
चरण 2: समानांतर प्रोसेसिंग
पुराने और नए सिस्टम को समानांतर में चलाएं:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""नए सिस्टम और घटना स्ट्रीम दोनों में लिखें"""
try:
# पहले नए सिस्टम में लिखें
publish_to_kinesis(kinesis_stream, data)
# फिर पुराने सिस्टम को अपडेट करें
legacy_db.update(data)
except Exception as e:
# क्षतिपूर्ति तर्क लागू करें
rollback_kinesis_event(kinesis_stream, data)
raise
चरण 3: पूर्ण माइग्रेशन
एक बार जब विश्वास स्थापित हो जाता है, तो सभी ट्रैफिक को घटना-ड्राइवन आर्किटेक्चर के माध्यम से रूट करें।
लागत अनुकूलन रणनीतियाँ
1. परिवर्तनीय वर्कलोड्स के लिए ऑन-डिमांड मोड का उपयोग करें
ऑन-डिमांड मोड (2023 में पेश किया गया) ट्रैफिक के आधार पर स्वचालित रूप से स्केल करता है:
# ऑन-डिमांड मोड के साथ स्ट्रीम बनाएं
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. डेटा एग्रीगेशन लागू करें
रेकॉर्ड्स को बैच करने से PUT पेलोड यूनिट्स कम करें:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""लागत कम करने के लिए रेकॉर्ड्स को एग्रीगेट करें"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# एग्रीगेटेड रेकॉर्ड भेजें
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. डेटा रिटेंशन को अनुकूलित करें
डिफ़ॉल्ट रिटेंशन 24 घंटे है। केवल आवश्यकता पड़ने पर ही इसे बढ़ाएं:
# रिटेंशन को 7 दिनों तक सेट करें
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
सुरक्षा सर्वोत्तम प्रथाएँ
1. रेस्ट और ट्रांजिट में एन्क्रिप्शन
# एन्क्रिप्टेड स्ट्रीम बनाएं
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# एन्क्रिप्शन सक्षम करें
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. न्यूनतम विशेषाधिकार के लिए IAM पॉलिसियाँ
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. VPC एंडपॉइंट्स
ट्रैफिक को AWS नेटवर्क के भीतर रखें। AWS इन्फ्रास्ट्रक्चर को कोड के रूप में प्रबंधित करने के लिए, Terraform का उपयोग करने पर विचार करें - हमारी Terraform चीटशीट देखें:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
अवलोकन और डिबगिंग
X-Ray के साथ वितरित ट्रेसिंग
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
CloudWatch Logs Insights क्वेरी
-- धीमी प्रोसेसिंग टाइम्स ढूंढें
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- एरर रेट ट्रैक करें
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
उन्नत पैटर्न
वितरित ट्रांजैक्शन्स के लिए सागा पैटर्न
माइक्रोसर्विसेस के माध्यम से लंबे समय तक चलने वाले ट्रांजैक्शन्स लागू करें:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""क्षतिपूर्ति तर्क के साथ सागा निष्पादित करें"""
try:
# चरण 1: इन्वेंटरी रिजर्व करें
self.publish_command('RESERVE_INVENTORY', order_data)
# चरण 2: भुगतान प्रोसेस करें
self.publish_command('PROCESS_PAYMENT', order_data)
# चरण 3: ऑर्डर शिप करें
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# विपरीत क्रम में क्षतिपूर्ति करें
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""क्षतिपूर्ति ट्रांजैक्शन्स निष्पादित करें"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
परीक्षण रणनीतियाँ
LocalStack के साथ स्थानीय विकास
# Kinesis के साथ LocalStack शुरू करें
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# परीक्षण स्ट्रीम बनाएं
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
इंटीग्रेशन परीक्षण
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""मॉक्ड Kinesis के साथ घटना प्रकाशन परीक्षण करें"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
प्रदर्शन ट्यूनिंग
बैच साइज को अनुकूलित करें
def optimize_batch_processing(records, batch_size=500):
"""अनुकूलित बैच में रेकॉर्ड्स प्रोसेस करें"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
कनेक्शन पूलिंग का उपयोग करें
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
उपयोगी लिंक्स
AWS Kinesis संसाधन:
- AWS Kinesis दस्तावेज़ीकरण
- AWS Kinesis Data Streams Developer Guide
- Kinesis Client Library (KCL)
- AWS Kinesis प्राइसिंग कैलकुलेटर
- Kinesis Data Streams कोटे और सीमाएँ
- AWS Architecture Blog - Event-Driven Architectures
- AWS Samples - Kinesis Examples
संबंधित लेख:
- Rabbitmq on Eks vs Sqs होस्टिंग लागत तुलना
- TypeScript चीटशीट: कोर कॉन्सेप्ट्स और सर्वोत्तम प्रथाएँ
- Python चीटशीट
- Terraform चीटशीट - उपयोगी कमांड्स और उदाहरण
निष्कर्ष
AWS Kinesis स्केलेबल, घटना-ड्राइवन माइक्रोसर्विसेस आर्किटेक्चर बनाने के लिए एक मजबूत आधार प्रदान करता है। इन पैटर्न और सर्वोत्तम प्रथाओं का पालन करके, आप उन सिस्टम बना सकते हैं जो लचीले, स्केलेबल, और बनाए रखने योग्य हैं। एक एकल घटना स्ट्रीम से शुरू करें, अपने आर्किटेक्चर को वैलिडेट करें, और जैसे-जैसे आपका सिस्टम बढ़ता है, धीरे-धीरे अधिक जटिल पैटर्न की ओर बढ़ें।
सफलता की कुंजी है आपकी डेटा फ्लो आवश्यकताओं को समझना, अपने उपयोग के मामले के लिए सही Kinesis सेवा चुनना, और शुरुआत से ही उचित निगरानी और एरर हैंडलिंग लागू करना।