AWS Kinesis के साथ इवेंट-ड्राइवन माइक्रोसर्विसेज बनाना

AWS Kinesis के साथ इवेंट-ड्राइवन आर्किटेक्चर के लिए स्केल

Page content

AWS Kinesis ने आधुनिक इवेंट-ड्राइवन माइक्रोसर्विसेस आर्किटेक्चर बनाने के लिए एक महत्वपूर्ण आधार बन गया है, जो कम ऑपरेशनल ओवरहेड के साथ रियल-टाइम डेटा प्रोसेसिंग को स्केल करने की अनुमति देता है।

amazon-kinesis

इवेंट-ड्राइवन माइक्रोसर्विसेस आर्किटेक्चर को समझना

इवेंट-ड्राइवन आर्किटेक्चर (EDA) एक डिजाइन पैटर्न है जहां सेवाएं डायरेक्ट सिंक्रोनस कॉल्स के बजाय इवेंट्स के माध्यम से संचार करती हैं। इस दृष्टिकोण के कई फायदे हैं:

  • लूज़ कपलिंग: सेवाओं को एक दूसरे के अस्तित्व के बारे में जानने की आवश्यकता नहीं होती
  • स्केलेबिलिटी: प्रत्येक सेवा अपने वर्कलोड के आधार पर स्वतंत्र रूप से स्केल होती है
  • रिज़िलिएंस: एक सेवा में विफलता अन्य सेवाओं में कास्केड नहीं होती
  • फ्लेक्सिबिलिटी: नई सेवाओं को जोड़ा जा सकता है बिना मौजूदा सेवाओं में परिवर्तन किए

AWS Kinesis EDA लागू करने के लिए एक आधार प्रदान करता है, जो एक वितरित, ड्यूरेबल इवेंट स्ट्रीम के रूप में कार्य करता है जो प्रोड्यूसर्स को कन्स्यूमर्स से डिकपल करता है।

AWS Kinesis ओवरव्यू

AWS कई Kinesis सेवाएं प्रदान करता है, प्रत्येक विशिष्ट उपयोग के मामलों के लिए डिज़ाइन की गई हैं। स्ट्रीमिंग समाधानों का मूल्यांकन करते समय, आप RabbitMQ on EKS vs SQS की तुलना भी करना चाह सकते हैं, जो विभिन्न मेसेजिंग पैटर्न्स और लागत प्रभावों के लिए है।

Kinesis Data Streams

कोर स्ट्रीमिंग सेवा जो डेटा रिकॉर्ड्स को रियल-टाइम में कैप्चर, स्टोर, और प्रोसेस करती है। डेटा स्ट्रीम्स आदर्श है:

  • कस्टम रियल-टाइम प्रोसेसिंग एप्लिकेशन्स के लिए
  • सब-सेकंड लेटेंसी के साथ डेटा पाइपलाइन्स बनाने के लिए
  • प्रति सेकंड मिलियन इवेंट्स को प्रोसेस करने के लिए
  • इवेंट सोर्सिंग पैटर्न्स लागू करने के लिए

Kinesis Data Firehose

एक पूर्ण रूप से प्रबंधित सेवा जो स्ट्रीमिंग डेटा को S3, Redshift, Elasticsearch, या HTTP एंडपॉइंट्स जैसे डेस्टिनेशन्स तक पहुंचाती है। सबसे अच्छा है:

  • सरल ETL पाइपलाइन्स के लिए
  • लॉग एग्रीगेशन और आर्काइवल के लिए
  • नियर-रियल-टाइम एनालिटिक्स (60-सेकंड मिनिमम लेटेंसी) के लिए
  • उन स्थितियों के लिए जहां आपको कस्टम प्रोसेसिंग लॉजिक की आवश्यकता नहीं होती

Kinesis Data Analytics

SQL या Apache Flink का उपयोग करके स्ट्रीमिंग डेटा को प्रोसेस और विश्लेषण करता है। उपयोग के मामले शामिल हैं:

  • रियल-टाइम डैशबोर्ड्स
  • स्ट्रीमिंग ETL
  • रियल-टाइम एनोमली डिटेक्शन
  • कंटीन्यूअस मेट्रिक जनरेशन

Kinesis के साथ आर्किटेक्चरल पैटर्न्स

1. इवेंट सोर्सिंग पैटर्न

इवेंट सोर्सिंग एप्लिकेशन स्टेट में सभी परिवर्तनों को एक इवेंट्स की श्रृंखला के रूप में स्टोर करता है। Kinesis इसके लिए आदर्श है। यदि आपको Python फंडामेंटल्स पर एक रिफ्रेशर की आवश्यकता है, तो हमारे Python Cheatsheet देखें:

import boto3
import json
from datetime import datetime

kinesis = boto3.client('kinesis', region_name='us-east-1')

def publish_event(stream_name, event_type, payload):
    """Publish an event to Kinesis stream"""
    event = {
        'eventId': generate_unique_id(),
        'eventType': event_type,
        'timestamp': datetime.utcnow().isoformat(),
        'payload': payload
    }

    response = kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event),
        PartitionKey=payload.get('userId', 'default')
    )

    return response['SequenceNumber']

# Example: User registration event
publish_event(
    stream_name='user-events',
    event_type='USER_REGISTERED',
    payload={
        'userId': '12345',
        'email': 'user@example.com',
        'registrationDate': '2025-10-30'
    }
)

2. CQRS (Command Query Responsibility Segregation)

Kinesis को इवेंट बस के रूप में उपयोग करके रीड और राइट ऑपरेशन्स को अलग करें:

package main

import (
    "encoding/json"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/service/kinesis"
)

type OrderCommand struct {
    CommandType string      `json:"commandType"`
    OrderID     string      `json:"orderId"`
    UserID      string      `json:"userId"`
    Items       []OrderItem `json:"items"`
}

func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
    data, err := json.Marshal(command)
    if err != nil {
        return err
    }

    _, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
        StreamName:   aws.String("order-commands"),
        Data:         data,
        PartitionKey: aws.String(command.OrderID),
    })

    return err
}

3. Fan-Out Pattern with Lambda

एक ही स्ट्रीम से इवेंट्स को कई Lambda फंक्शन्स के साथ प्रोसेस करें। टाइपसेफ्टी के साथ मजबूत टाइपसेफ्टी के लिए टाइपस्क्रिप्ट इम्प्लीमेंटेशन्स के लिए, हमारे TypeScript Cheatsheet देखें:

// Lambda consumer for email notifications
exports.handler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );

        if (payload.eventType === 'ORDER_PLACED') {
            await sendOrderConfirmationEmail(payload);
        }
    };
};

// Another Lambda for inventory updates
exports.inventoryHandler = async (event) => {
    for (const record of event.Records) {
        const payload = JSON.parse(
            Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
        );

        if (payload.eventType === 'ORDER_PLACED') {
            await updateInventory(payload.items);
        }
    };
};

प्रोडक्शन के लिए बेस्ट प्रैक्टिसेस

1. सही शार्ड काउंट चुनना

अपने शार्ड आवश्यकताओं का गणना करें:

  • इनग्रेस: प्रति शार्ड 1 MB/sec या 1,000 रिकॉर्ड्स/sec
  • ईग्रेस: प्रति शार्ड 2 MB/sec (स्टैंडर्ड कन्स्यूमर्स) या प्रति कन्स्यूमर 2 MB/sec के साथ एन्हांस्ड फैन-आउट
def calculate_shards(records_per_second, avg_record_size_kb):
    """Calculate required number of shards"""
    # Ingress capacity
    ingress_shards = max(
        records_per_second / 1000,
        (records_per_second * avg_record_size_kb) / 1024
    )

    return int(ingress_shards) + 1  # Add buffer

2. उचित एरर हैंडलिंग लागू करें

from botocore.exceptions import ClientError
import time

def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
                          max_retries=3):
    """Put record with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            response = kinesis_client.put_record(
                StreamName=stream_name,
                Data=data,
                PartitionKey=partition_key
            )
            return response
        except ClientError as e:
            if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # Exponential backoff
                    continue
            raise

3. कई कन्स्यूमर्स के लिए एन्हांस्ड फैन-आउट का उपयोग करें

एन्हांस्ड फैन-आउट प्रत्येक कन्स्यूमर के लिए डेडिकेटेड थ्रूपुट प्रदान करता है:

# Register a consumer with enhanced fan-out
aws kinesis register-stream-consumer \
    --stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
    --consumer-name my-consumer-app

4. महत्वपूर्ण मेट्रिक्स का निगरानी करें

ट्रैक करने के लिए आवश्यक CloudWatch मेट्रिक्स:

  • IncomingRecords: सफलतापूर्वक रखे गए रिकॉर्ड्स की संख्या
  • IncomingBytes: इनकमिंग डेटा का बाइट वॉल्यूम
  • GetRecords.IteratorAgeMilliseconds: कन्स्यूमर्स कितने पीछे हैं
  • WriteProvisionedThroughputExceeded: थ्रॉटलिंग इवेंट्स
  • ReadProvisionedThroughputExceeded: कन्स्यूमर थ्रॉटलिंग

5. उचित पार्टिशन की स्ट्रैटेजी लागू करें

import hashlib

def get_partition_key(user_id, shard_count=10):
    """Generate partition key with even distribution"""
    # Use consistent hashing for even distribution
    hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
    shard_id = hash_value % shard_count
    return f"{user_id}#{shard_id}"

रियल-वर्ल्ड इम्प्लीमेंटेशन उदाहरण

यह एक ऑर्डर प्रोसेसिंग माइक्रोसर्विसेस आर्किटेक्चर का एक पूर्ण उदाहरण है:

import boto3
import json
from decimal import Decimal
from typing import Dict, List

class OrderProcessingService:
    def __init__(self, stream_name: str):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name

    def create_order(self, user_id: str, items: List[Dict]) -> str:
        """Create order and publish events"""
        order_id = self.generate_order_id()

        # Publish order created event
        self.publish_event('ORDER_CREATED', {
            'orderId': order_id,
            'userId': user_id,
            'items': items,
            'status': 'PENDING',
            'total': self.calculate_total(items)
        }, partition_key=user_id)

        return order_id

    def publish_event(self, event_type: str, payload: Dict,
                     partition_key: str):
        """Publish event to Kinesis stream"""
        event = {
            'eventType': event_type,
            'timestamp': datetime.utcnow().isoformat(),
            'payload': payload
        }

        self.kinesis.put_record(
            StreamName=self.stream_name,
            Data=json.dumps(event, default=str),
            PartitionKey=partition_key
        )

class InventoryService:
    """Consumes order events and updates inventory"""

    def process_records(self, records):
        for record in records:
            data = json.loads(record['kinesis']['data'])

            if data['eventType'] == 'ORDER_CREATED':
                self.reserve_inventory(data['payload'])
            elif data['eventType'] == 'ORDER_CANCELLED':
                self.release_inventory(data['payload'])

    def reserve_inventory(self, order_data):
        # Update inventory database
        for item in order_data['items']:
            # Implementation here
            pass

मोनोलिथ से माइक्रोसर्विसेस में माइग्रेशन रणनीति

चरण 1: स्ट्रैंगलर फिग पैटर्न

Kinesis के माध्यम से विशिष्ट घटनाओं को रूटिंग करना शुरू करें जबकि मोनोलिथ को बनाए रखें:

  1. अपने मोनोलिथ में बाउंडेड कॉन्टेक्स्ट्स की पहचान करें
  2. क्रॉस-कॉन्टेक्स्ट घटनाओं के लिए Kinesis स्ट्रीम्स बनाएं
  3. इन स्ट्रीम्स से डेटा कन्स्यूम करने वाले सेवाओं को धीरे-धीरे निकालें
  4. मोनोलिथ के साथ पिछले संगतता बनाए रखें

चरण 2: समानांतर प्रोसेसिंग

पुराने और नए सिस्टम को समानांतर में चलाएं:

def dual_write_pattern(legacy_db, kinesis_stream, data):
    """नए सिस्टम और घटना स्ट्रीम दोनों में लिखें"""
    try:
        # पहले नए सिस्टम में लिखें
        publish_to_kinesis(kinesis_stream, data)

        # फिर पुराने सिस्टम को अपडेट करें
        legacy_db.update(data)
    except Exception as e:
        # क्षतिपूर्ति तर्क लागू करें
        rollback_kinesis_event(kinesis_stream, data)
        raise

चरण 3: पूर्ण माइग्रेशन

एक बार जब विश्वास स्थापित हो जाता है, तो सभी ट्रैफिक को घटना-ड्राइवन आर्किटेक्चर के माध्यम से रूट करें।

लागत अनुकूलन रणनीतियाँ

1. परिवर्तनीय वर्कलोड्स के लिए ऑन-डिमांड मोड का उपयोग करें

ऑन-डिमांड मोड (2023 में पेश किया गया) ट्रैफिक के आधार पर स्वचालित रूप से स्केल करता है:

# ऑन-डिमांड मोड के साथ स्ट्रीम बनाएं
aws kinesis create-stream \
    --stream-name my-stream \
    --stream-mode-details StreamMode=ON_DEMAND

2. डेटा एग्रीगेशन लागू करें

रेकॉर्ड्स को बैच करने से PUT पेलोड यूनिट्स कम करें:

from aws_kinesis_agg.aggregator import RecordAggregator

def batch_put_records(kinesis_client, stream_name, records):
    """लागत कम करने के लिए रेकॉर्ड्स को एग्रीगेट करें"""
    aggregator = RecordAggregator()

    for record in records:
        aggregator.add_user_record(
            partition_key=record['partition_key'],
            data=record['data']
        )

    # एग्रीगेटेड रेकॉर्ड भेजें
    kinesis_client.put_record(
        StreamName=stream_name,
        Data=aggregator.serialize(),
        PartitionKey=records[0]['partition_key']
    )

3. डेटा रिटेंशन को अनुकूलित करें

डिफ़ॉल्ट रिटेंशन 24 घंटे है। केवल आवश्यकता पड़ने पर ही इसे बढ़ाएं:

# रिटेंशन को 7 दिनों तक सेट करें
aws kinesis increase-stream-retention-period \
    --stream-name my-stream \
    --retention-period-hours 168

सुरक्षा सर्वोत्तम प्रथाएँ

1. रेस्ट और ट्रांजिट में एन्क्रिप्शन

# एन्क्रिप्टेड स्ट्रीम बनाएं
kinesis.create_stream(
    StreamName='secure-stream',
    ShardCount=1,
    StreamModeDetails={'StreamMode': 'PROVISIONED'}
)

# एन्क्रिप्शन सक्षम करें
kinesis.start_stream_encryption(
    StreamName='secure-stream',
    EncryptionType='KMS',
    KeyId='alias/aws/kinesis'
)

2. न्यूनतम विशेषाधिकार के लिए IAM पॉलिसियाँ

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "kinesis:PutRecord",
        "kinesis:PutRecords"
      ],
      "Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
    }
  ]
}

3. VPC एंडपॉइंट्स

ट्रैफिक को AWS नेटवर्क के भीतर रखें। AWS इन्फ्रास्ट्रक्चर को कोड के रूप में प्रबंधित करने के लिए, Terraform का उपयोग करने पर विचार करें - हमारी Terraform चीटशीट देखें:

aws ec2 create-vpc-endpoint \
    --vpc-id vpc-12345678 \
    --service-name com.amazonaws.us-east-1.kinesis-streams \
    --route-table-ids rtb-12345678

अवलोकन और डिबगिंग

X-Ray के साथ वितरित ट्रेसिंग

from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

patch_all()

@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
    xray_recorder.put_annotation('eventType', event_data['type'])
    xray_recorder.put_metadata('payload', event_data['payload'])

    kinesis.put_record(
        StreamName=stream_name,
        Data=json.dumps(event_data),
        PartitionKey=event_data['id']
    )

CloudWatch Logs Insights क्वेरी

-- धीमी प्रोसेसिंग टाइम्स ढूंढें
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20

-- एरर रेट ट्रैक करें
fields @timestamp, eventType
| filter error = true
| stats count() by eventType

उन्नत पैटर्न

वितरित ट्रांजैक्शन्स के लिए सागा पैटर्न

माइक्रोसर्विसेस के माध्यम से लंबे समय तक चलने वाले ट्रांजैक्शन्स लागू करें:

class OrderSaga:
    def __init__(self, kinesis_client, stream_name):
        self.kinesis = kinesis_client
        self.stream_name = stream_name
        self.saga_id = str(uuid.uuid4())

    def execute(self, order_data):
        """क्षतिपूर्ति तर्क के साथ सागा निष्पादित करें"""
        try:
            # चरण 1: इन्वेंटरी रिजर्व करें
            self.publish_command('RESERVE_INVENTORY', order_data)

            # चरण 2: भुगतान प्रोसेस करें
            self.publish_command('PROCESS_PAYMENT', order_data)

            # चरण 3: ऑर्डर शिप करें
            self.publish_command('SHIP_ORDER', order_data)

        except SagaException as e:
            # विपरीत क्रम में क्षतिपूर्ति करें
            self.compensate(e.failed_step)

    def compensate(self, failed_step):
        """क्षतिपूर्ति ट्रांजैक्शन्स निष्पादित करें"""
        compensation_steps = {
            'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
            'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
        }

        for step in compensation_steps.get(failed_step, []):
            self.publish_command(step, {'sagaId': self.saga_id})

परीक्षण रणनीतियाँ

LocalStack के साथ स्थानीय विकास

# Kinesis के साथ LocalStack शुरू करें
docker run -d \
  -p 4566:4566 \
  -e SERVICES=kinesis \
  localstack/localstack

# परीक्षण स्ट्रीम बनाएं
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
    --stream-name test-stream \
    --shard-count 1

इंटीग्रेशन परीक्षण

import pytest
from moto import mock_kinesis

@mock_kinesis
def test_event_publishing():
    """मॉक्ड Kinesis के साथ घटना प्रकाशन परीक्षण करें"""
    kinesis = boto3.client('kinesis', region_name='us-east-1')
    kinesis.create_stream(StreamName='test-stream', ShardCount=1)

    service = OrderProcessingService('test-stream')
    order_id = service.create_order('user123', [
        {'productId': 'prod1', 'quantity': 2}
    ])

    assert order_id is not None

प्रदर्शन ट्यूनिंग

बैच साइज को अनुकूलित करें

def optimize_batch_processing(records, batch_size=500):
    """अनुकूलित बैच में रेकॉर्ड्स प्रोसेस करें"""
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        process_batch(batch)

कनेक्शन पूलिंग का उपयोग करें

from botocore.config import Config

config = Config(
    max_pool_connections=50,
    retries={'max_attempts': 3, 'mode': 'adaptive'}
)

kinesis = boto3.client('kinesis', config=config)

उपयोगी लिंक्स

AWS Kinesis संसाधन:

संबंधित लेख:

निष्कर्ष

AWS Kinesis स्केलेबल, घटना-ड्राइवन माइक्रोसर्विसेस आर्किटेक्चर बनाने के लिए एक मजबूत आधार प्रदान करता है। इन पैटर्न और सर्वोत्तम प्रथाओं का पालन करके, आप उन सिस्टम बना सकते हैं जो लचीले, स्केलेबल, और बनाए रखने योग्य हैं। एक एकल घटना स्ट्रीम से शुरू करें, अपने आर्किटेक्चर को वैलिडेट करें, और जैसे-जैसे आपका सिस्टम बढ़ता है, धीरे-धीरे अधिक जटिल पैटर्न की ओर बढ़ें।

सफलता की कुंजी है आपकी डेटा फ्लो आवश्यकताओं को समझना, अपने उपयोग के मामले के लिए सही Kinesis सेवा चुनना, और शुरुआत से ही उचित निगरानी और एरर हैंडलिंग लागू करना।