AWS SAM, AWS SQS और Python PowerTools का उपयोग करके सर्वरलेस सिस्टम्स विकास

उदाहरण कार्यान्वयन

Page content

यहाँ AWS SAM में लैम्ब्डा फंक्शन जोड़ने के लिए आवश्यक चरणों की सूची दी गई है:

  1. REST API लैम्ब्डा: SQS को संदेश भेजें
  2. SQS लैम्ब्डा ट्रिगर: SQS से संदेश प्रोसेस करें

अगर आप curious हैं कि AWS SAM क्या है और कैसे शुरू करें - इस दस्तावेज़ की जांच करें: Layered Lambdas with AWS SAM and Python

पोस्ट ऑफिस के बॉक्स की पंक्ति

प्रीकॉन्फिग

AWS SAM और पाइथन के साथ REST-API लागू करें, जैसे कि Layered Lambdas with AWS SAM and Python में वर्णित है

REST API लैम्ब्डा: SQS को संदेश भेजें

template.yaml में क्यू का वर्णन करें

    MySQSQueue:
        Type: String
        Default: "TheQueue"
    MySQSQueueArn:
        Type: String
        Default: "arn:aws:sqs:ap-southeast-2:123somenumbers123:TheQueue"

template.yaml में उस API फंक्शन जोड़ें जो संदेश को AWS SQS को भेजेगा।

      PostMessageFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostMessageFunction function
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                PostProjectEstimatePath:
                    Type: Api
                    Properties:
                        Path: /q/{i}
                        Method: POST
                        RestApiId: !Ref MyApi
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSSendMessagePolicy:
                    QueueName: !Ref MySQSQueue

संदेश क्लास लागू करें

from pydantic import BaseModel, Field
from typing import List, Optional

class MySqsRequest(BaseModel):
    I: Optional[int] = Field(default=None)

app.py में क्यू को संदेश भेजने के लिए पाइथन कोड लागू करें

@app.post("/q/<i>")
@tracer.capture_method
def post_q(i: int) -> dict:
    try:
        metrics.add_metric(name="PostMessageFunction", unit=MetricUnit.Count, value=1)

        queue_url = 'your queue url here'
        request = MySqsRequest(I=i)
        message = request.model_dump_json()
        send_sqs_message(queue_url, message)

        return { 'response_code': 200 }

    except ValueError as ve:
        msg = str(ve)
        logger.warning(f"Error processing prequest: {msg}")
        return {
            'response_code': 400,
            'message': msg
        }
    except Exception as e:
        logger.error(f"Error processing  request: {str(e)}")
        return {
            'response_code': 500,
            'message': 'Internal server error occurred while processing request'
        }

# ----------------------------------------------------------------------------------------------
def get_queue_url() -> str:
    queue_name = get_queue_name()
    sqs_client = boto3.client("sqs")
    response = sqs_client.get_queue_url(
        QueueName=queue_name,
    )
    return response["QueueUrl"]

# ----------------------------------------------------------------------------------------------
def send_sqs_message(queue_url, message):
    sqs_client = boto3.client("sqs")
    sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message),
    )

# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return app.resolve(event, context)

SQS लैम्ब्डा ट्रिगर: SQS से संदेश प्रोसेस करें

template.yaml में हैंडलर वर्णन जोड़ें

      PostCompleteFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app-sqs.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostCompleteFunction function
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                SQSQueueEstimateEvent:
                    Type: SQS
                    Properties:
                        Queue: !Ref MySQSQueueArn
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSPollerPolicy:
                    QueueName: !Ref MySQSQueue

और app-sqs.py में पाइथन कोड:

tracer = Tracer()
logger = Logger()
metrics = Metrics(namespace="Powertools")

# ----------------------------------------------------------------------------------------------
processor = BatchProcessor(event_type=EventType.SQS)

# ----------------------------------------------------------------------------------------------
@tracer.capture_method
def record_handler(record: SQSRecord):
    payload: str = record.json_body
    req = TypeAdapter(MySqsRequest).validate_json(payload)
    logger.info(f"I: {req.I}")

# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event,
        record_handler=record_handler,
        processor=processor,
        context=context,
    )

डिप्लॉय

sam validate && \
sam build --use-container && \
sam deploy

नमूना कॉल

$token और $api एन्वायरनमेंट वेरिएबल कॉन्फ़िगर करें फिर कॉल करें:

curl -v -H "Authorization: Bearer $token" -X POST ${api}/q/6 -d @test-curl/q_src.json | jq .

आपको कुछ ऐसा देखना चाहिए

{
  "response_code": 200
}

लॉगचेक:

sam logs -n PostCompleteFunction --tail

आपको कुछ ऐसा देखना चाहिए

2025/01/16/[$LATEST]5168b560c9eb4ca4871e0ed3e7f0c20d 2025-01-16T02:15:13.842000 {
  "level": "INFO",
  "location": "record_handler:585",
  "message": "I: 6",
  "timestamp": "2025-01-16 02:15:13,842+0000",
  "service": "PowertoolsHelloWorld",
  "cold_start": true,
  "function_name": "asdasdads-PostCompleteFunction-qwe123",
  "function_memory_size": "1024",
  "function_arn": "arn:aws:lambda:ap-southeast-2:123123123:function:asdasdads-PostCompleteFunction-qwe123",
  "function_request_id": "f7f469da-5f33-5087-8613-09d832dcedd7",
  "xray_trace_id": "1-67886baf-8fefc4f2eb65405758c4d008"
}

बहुत अच्छा!

उम्मीद है कि आपको AWS सर्वरलेस प्लेटफॉर्म के साथ असिंक्रोनस प्रोसेसिंग का यह उदाहरण उपयोगी लगा है। शुभ दिन हो!

उपयोगी लिंक