AWS SAM + AWS SQS + Python PowerTools

Exemple d'implémentation

Sommaire

Ici, je liste les étapes nécessaires pour implémenter un couple de fonctions lambda dans AWS SAM :

  1. REST API Lambda : envoyer un message à SQS
  2. Déclencheur Lambda SQS : traiter le message provenant de SQS

Si vous êtes curieux de savoir ce qu’est AWS SAM et comment commencer - consultez ce document : Fonctions Lambda en couches avec AWS SAM et Python

file d’attente de boîtes aux lettres

Préconfiguration

Implémentez une API REST avec AWS SAM et Python, par exemple comme décrit dans Fonctions Lambda en couches avec AWS SAM et Python

Lambda REST API : envoyer un message à SQS

Décrivez la file d’attente dans template.yaml

    MySQSQueue:
        Type: String
        Default: "TheQueue"
    MySQSQueueArn:
        Type: String
        Default: "arn:aws:sqs:ap-southeast-2:123somenumbers123:TheQueue"

Ajoutez à template.yaml la fonction API qui enverra le message à AWS SQS.

      PostMessageFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostMessageFunction function
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                PostProjectEstimatePath:
                    Type: Api
                    Properties:
                        Path: /q/{i}
                        Method: POST
                        RestApiId: !Ref MyApi
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSSendMessagePolicy:
                    QueueName: !Ref MySQSQueue

Implémentez la classe de message

from pydantic import BaseModel, Field
from typing import List, Optional

class MySqsRequest(BaseModel):
    I: Optional[int] = Field(default=None)

Implémentez le code Python pour envoyer un message à la file d’attente dans app.py

@app.post("/q/<i>")
@tracer.capture_method
def post_q(i: int) -> dict:
    try:
        metrics.add_metric(name="PostMessageFunction", unit=MetricUnit.Count, value=1)

        queue_url = 'your queue url here'
        request = MySqsRequest(I=i)
        message = request.model_dump_json()
        send_sqs_message(queue_url, message)

        return { 'response_code': 200 }
    
    except ValueError as ve:
        msg = str(ve)
        logger.warning(f"Erreur lors du traitement de la demande : {msg}")
        return {
            'response_code': 400,
            'message': msg
        }
    except Exception as e:
        logger.error(f"Erreur lors du traitement de la demande : {str(e)}")
        return {
            'response_code': 500,
            'message': 'Une erreur interne du serveur s\'est produite lors du traitement de la demande'
        }


# ----------------------------------------------------------------------------------------------
def get_queue_url() -> str:
    queue_name = get_queue_name()
    sqs_client = boto3.client("sqs")
    response = sqs_client.get_queue_url(
        QueueName=queue_name,
    )
    return response["QueueUrl"]

# ----------------------------------------------------------------------------------------------
def send_sqs_message(queue_url, message):
    sqs_client = boto3.client("sqs")
    sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message),
    )


# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return app.resolve(event, context)

Déclencheur Lambda SQS : traiter le message provenant de SQS

Ajoutez la description du gestionnaire dans template.yaml

      PostCompleteFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app-sqs.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostCompleteFunction function
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                SQSQueueEstimateEvent:
                    Type: SQS
                    Properties:
                        Queue: !Ref MySQSQueueArn
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSPollerPolicy:
                    QueueName: !Ref MySQSQueue

et le code Python dans app-sqs.py :

tracer = Tracer()
logger = Logger()
metrics = Metrics(namespace="Powertools")

# ----------------------------------------------------------------------------------------------
processor = BatchProcessor(event_type=EventType.SQS)  

# ----------------------------------------------------------------------------------------------
@tracer.capture_method
def record_handler(record: SQSRecord):  
    payload: str = record.json_body 
    req = TypeAdapter(MySqsRequest).validate_json(payload)
    logger.info(f"I: {req.I}")


# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(  
        event=event,
        record_handler=record_handler,
        processor=processor,
        context=context,
    )

Déploiement

sam validate && \
sam build --use-container && \
sam deploy

Appel d’exemple

configurez $token et $api variables d’environnement puis faites un appel :

curl -v -H "Authorization: Bearer $token" -X POST ${api}/q/6 -d @test-curl/q_src.json | jq . 

vous devriez voir quelque chose comme

{
  "response_code": 200
}

Vérification des logs :

sam logs -n PostCompleteFunction --tail

vous devriez voir quelque chose proche de

2025/01/16/[$LATEST]5168b560c9eb4ca4871e0ed3e7f0c20d 2025-01-16T02:15:13.842000 {
  "level": "INFO",
  "location": "record_handler:585",
  "message": "I: 6",
  "timestamp": "2025-01-16 02:15:13,842+0000",
  "service": "PowertoolsHelloWorld",
  "cold_start": true,
  "function_name": "asdasdads-PostCompleteFunction-qwe123",
  "function_memory_size": "1024",
  "function_arn": "arn:aws:lambda:ap-southeast-2:123123123:function:asdasdads-PostCompleteFunction-qwe123",
  "function_request_id": "f7f469da-5f33-5087-8613-09d832dcedd7",
  "xray_trace_id": "1-67886baf-8fefc4f2eb65405758c4d008"
}

C’est bien !

Liens utiles