Serverless systeemontwikkeling met behulp van AWS SAM, AWS SQS en Python PowerTools

Voorbeeld van implementatie

Inhoud

Hier geef ik een overzicht van de stappen die nodig zijn om een paar lambda functies in AWS SAM te implementeren:

  1. REST API Lambda: bericht verzenden naar SQS
  2. SQS Lambda Trigger: bericht verwerken vanaf SQS

Als je nieuwsgierig bent wat AWS SAM is en hoe je ermee kunt beginnen - bekijk dan deze documentatie: Laaggestructureerde Lambdas met AWS SAM en Python

rij van postbus dozen

Vooraf

Implementeer een REST-API met AWS SAM en Python, bijvoorbeeld zoals beschreven in Laaggestructureerde Lambdas met AWS SAM en Python

REST API Lambda: bericht verzenden naar SQS

Beschrijf de wachtrij in template.yaml

    MySQSQueue:
        Type: String
        Default: "TheQueue"
    MySQSQueueArn:
        Type: String
        Default: "arn:aws:sqs:ap-southeast-2:123somenumbers123:TheQueue"

Voeg in template.yaml de API functie toe die het bericht naar AWS SQS zal sturen.

      PostMessageFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostMessageFunction functie
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                PostProjectEstimatePath:
                    Type: Api
                    Properties:
                        Path: /q/{i}
                        Method: POST
                        RestApiId: !Ref MyApi
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSSendMessagePolicy:
                    QueueName: !Ref MySQSQueue

Implementeer de berichtklasse

from pydantic import BaseModel, Field
from typing import List, Optional

class MySqsRequest(BaseModel):
    I: Optional[int] = Field(default=None)

Implementeer de Python code om het bericht naar de wachtrij te sturen in app.py

@app.post("/q/<i>")
@tracer.capture_method
def post_q(i: int) -> dict:
    try:
        metrics.add_metric(name="PostMessageFunction", unit=MetricUnit.Count, value=1)

        queue_url = 'your queue url here'
        request = MySqsRequest(I=i)
        message = request.model_dump_json()
        send_sqs_message(queue_url, message)

        return { 'response_code': 200 }
    
    except ValueError as ve:
        msg = str(ve)
        logger.warning(f"Fout bij het verwerken van de aanvraag: {msg}")
        return {
            'response_code': 400,
            'message': msg
        }
    except Exception as e:
        logger.error(f"Fout bij het verwerken van de aanvraag: {str(e)}")
        return {
            'response_code': 500,
            'message': 'Interne serverfout is opgetreden tijdens het verwerken van de aanvraag'
        }


# ----------------------------------------------------------------------------------------------
def get_queue_url() -> str:
    queue_name = get_queue_name()
    sqs_client = boto3.client("sqs")
    response = sqs_client.get_queue_url(
        QueueName=queue_name,
    )
    return response["QueueUrl"]

# ----------------------------------------------------------------------------------------------
def send_sqs_message(queue_url, message):
    sqs_client = boto3.client("sqs")
    sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message),
    )


# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return app.resolve(event, context)

SQS Lambda Trigger: bericht verwerken vanaf SQS

Voeg een handler beschrijving toe in template.yaml

      PostCompleteFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app-sqs.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostCompleteFunction functie
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                SQSQueueEstimateEvent:
                    Type: SQS
                    Properties:
                        Queue: !Ref MySQSQueueArn
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSPollerPolicy:
                    QueueName: !Ref MySQSQueue

en de Python code in app-sqs.py:

tracer = Tracer()
logger = Logger()
metrics = Metrics(namespace="Powertools")

# ----------------------------------------------------------------------------------------------
processor = BatchProcessor(event_type=EventType.SQS)  

# ----------------------------------------------------------------------------------------------
@tracer.capture_method
def record_handler(record: SQSRecord):  
    payload: str = record.json_body 
    req = TypeAdapter(MySqsRequest).validate_json(payload)
    logger.info(f"I: {req.I}")


# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(  
        event=event,
        record_handler=record_handler,
        processor=processor,
        context=context,
    )

Implementatie

sam validate && \
sam build --use-container && \
sam deploy

Voorbeeld aanroep

Stel $token en $api omgevingsvariabelen in en voer een aanroep uit:

curl -v -H "Authorization: Bearer $token" -X POST ${api}/q/6 -d @test-curl/q_src.json | jq . 

je zou iets moeten zien zoals

{
  "response_code": 200
}

Logcheck:

sam logs -n PostCompleteFunction --tail

je zou iets moeten zien zoals

2025/01/16/[$LATEST]5168b560c9eb4ca4871e0ed3e7f0c20d 2025-01-16T02:15:13.842000 {
  "level": "INFO",
  "location": "record_handler:585",
  "message": "I: 6",
  "timestamp": "2025-01-16 02:15:13,842+0000",
  "service": "PowertoolsHelloWorld",
  "cold_start": true,
  "function_name": "asdasdads-PostCompleteFunction-qwe123",
  "function_memory_size": "1024",
  "function_arn": "arn:aws:lambda:ap-southeast-2:123123123:function:asdasdads-PostCompleteFunction-qwe123",
  "function_request_id": "f7f469da-5f33-5087-8613-09d832dcedd7",
  "xray_trace_id": "1-67886baf-8fefc4f2eb65405758c4d008"
}

Prima!

Ik hoop dat je deze voorbeeldimplementatie van asynchrone verwerking met de AWS serverless platform nuttig hebt gevonden. Heb een fijn dagje!