Entwicklung von serverlosen Systemen mit AWS SAM, AWS SQS und Python PowerTools

Beispiel für die Implementierung

Inhaltsverzeichnis

Hier liste ich die Schritte auf, die erforderlich sind, um ein Paar von Lambda-Funktionen in AWS SAM zu implementieren:

  1. REST-API-Lambda: Nachricht an SQS senden
  2. SQS-Lambda-Trigger: Nachricht von SQS verarbeiten

Wenn Sie neugierig sind, was AWS SAM ist und wie Sie beginnen können, überprüfen Sie dieses Dokument: Layered Lambdas mit AWS SAM und Python

line of post office boxes

Vorabkonfiguration

Implementieren Sie eine REST-API mit AWS SAM und Python, wie beispielsweise in der Beschreibung: Layered Lambdas mit AWS SAM und Python

REST-API-Lambda: Nachricht an SQS senden

Beschreiben Sie die Warteschlange in template.yaml

    MySQSQueue:
        Type: String
        Default: "TheQueue"
    MySQSQueueArn:
        Type: String
        Default: "arn:aws:sqs:ap-southeast-2:123somenumbers123:TheQueue"

Fügen Sie in template.yaml die API-Funktion hinzu, die die Nachricht an AWS SQS sendet.

      PostMessageFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostMessageFunction function
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                PostProjectEstimatePath:
                    Type: Api
                    Properties:
                        Path: /q/{i}
                        Method: POST
                        RestApiId: !Ref MyApi
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSSendMessagePolicy:
                    QueueName: !Ref MySQSQueue

Implementieren Sie die Nachrichtenklasse

from pydantic import BaseModel, Field
from typing import List, Optional

class MySqsRequest(BaseModel):
    I: Optional[int] = Field(default=None)

Implementieren Sie den Python-Code zum Senden der Nachricht an die Warteschlange in app.py

@app.post("/q/<i>")
@tracer.capture_method
def post_q(i: int) -> dict:
    try:
        metrics.add_metric(name="PostMessageFunction", unit=MetricUnit.Count, value=1)

        queue_url = 'your queue url here'
        request = MySqsRequest(I=i)
        message = request.model_dump_json()
        send_sqs_message(queue_url, message)

        return { 'response_code': 200 }

    except ValueError as ve:
        msg = str(ve)
        logger.warning(f"Error processing prequest: {msg}")
        return {
            'response_code': 400,
            'message': msg
        }
    except Exception as e:
        logger.error(f"Error processing  request: {str(e)}")
        return {
            'response_code': 500,
            'message': 'Internal server error occurred while processing request'
        }

# ----------------------------------------------------------------------------------------------
def get_queue_url() -> str:
    queue_name = get_queue_name()
    sqs_client = boto3.client("sqs")
    response = sqs_client.get_queue_url(
        QueueName=queue_name,
    )
    return response["QueueUrl"]

# ----------------------------------------------------------------------------------------------
def send_sqs_message(queue_url, message):
    sqs_client = boto3.client("sqs")
    sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message),
    )

# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return app.resolve(event, context)

SQS-Lambda-Trigger: Nachricht von SQS verarbeiten

Fügen Sie die Handler-Beschreibung in template.yaml ein

      PostCompleteFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app-sqs.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostCompleteFunction function
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                SQSQueueEstimateEvent:
                    Type: SQS
                    Properties:
                        Queue: !Ref MySQSQueueArn
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSPollerPolicy:
                    QueueName: !Ref MySQSQueue

und den Python-Code in app-sqs.py:

tracer = Tracer()
logger = Logger()
metrics = Metrics(namespace="Powertools")

# ----------------------------------------------------------------------------------------------
processor = BatchProcessor(event_type=EventType.SQS)

# ----------------------------------------------------------------------------------------------
@tracer.capture_method
def record_handler(record: SQSRecord):
    payload: str = record.json_body
    req = TypeAdapter(MySqsRequest).validate_json(payload)
    logger.info(f"I: {req.I}")

# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event,
        record_handler=record_handler,
        processor=processor,
        context=context,
    )

Bereitstellung

sam validate && \
sam build --use-container && \
sam deploy

Beispielaufruf

Konfigurieren Sie die $token- und $api-Umgebungsvariablen und führen Sie einen Aufruf aus:

curl -v -H "Authorization: Bearer $token" -X POST ${api}/q/6 -d @test-curl/q_src.json | jq .

Sie sollten etwas wie Folgendes sehen:

{
  "response_code": 200
}

Protokollüberprüfung:

sam logs -n PostCompleteFunction --tail

Sie sollten etwas sehen, das in etwa so aussieht:

2025/01/16/[$LATEST]5168b560c9eb4ca4871e0ed3e7f0c20d 2025-01-16T02:15:13.842000 {
  "level": "INFO",
  "location": "record_handler:585",
  "message": "I: 6",
  "timestamp": "2025-01-16 02:15:13,842+0000",
  "service": "PowertoolsHelloWorld",
  "cold_start": true,
  "function_name": "asdasdads-PostCompleteFunction-qwe123",
  "function_memory_size": "1024",
  "function_arn": "arn:aws:lambda:ap-southeast-2:123123123:function:asdasdads-PostCompleteFunction-qwe123",
  "function_request_id": "f7f469da-5f33-5087-8613-09d832dcedd7",
  "xray_trace_id": "1-67886baf-8fefc4f2eb65405758c4d008"
}

Toll!

Ich hoffe, Sie haben dieses Beispiel für die Implementierung von asynchroner Verarbeitung mit der AWS-Serverless-Plattform nützlich gefunden. Ich wünsche Ihnen einen schönen Tag!