AWS SAM + AWS SQS + Python PowerTools
Exemple d'implémentation
Ici, je liste les étapes nécessaires pour implémenter un couple de fonctions lambda dans AWS SAM :
- REST API Lambda : envoyer un message à SQS
- Déclencheur Lambda SQS : traiter le message provenant de SQS
Si vous êtes curieux de savoir ce qu’est AWS SAM et comment commencer - consultez ce document : Fonctions Lambda en couches avec AWS SAM et Python
Préconfiguration
Implémentez une API REST avec AWS SAM et Python, par exemple comme décrit dans Fonctions Lambda en couches avec AWS SAM et Python
Lambda REST API : envoyer un message à SQS
Décrivez la file d’attente dans template.yaml
MySQSQueue:
Type: String
Default: "TheQueue"
MySQSQueueArn:
Type: String
Default: "arn:aws:sqs:ap-southeast-2:123somenumbers123:TheQueue"
Ajoutez à template.yaml la fonction API qui enverra le message à AWS SQS.
PostMessageFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.lambda_handler
CodeUri: test
MemorySize: 1024
Description: PostMessageFunction function
Runtime: python3.12
Architectures:
- x86_64
Tracing: Active
Events:
PostProjectEstimatePath:
Type: Api
Properties:
Path: /q/{i}
Method: POST
RestApiId: !Ref MyApi
Layers:
- !Ref ApiSharedLayer
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
POWERTOOLS_METRICS_NAMESPACE: Powertools
LOG_LEVEL: INFO
Tags:
LambdaPowertools: python
Policies:
- SQSSendMessagePolicy:
QueueName: !Ref MySQSQueue
Implémentez la classe de message
from pydantic import BaseModel, Field
from typing import List, Optional
class MySqsRequest(BaseModel):
I: Optional[int] = Field(default=None)
Implémentez le code Python pour envoyer un message à la file d’attente dans app.py
@app.post("/q/<i>")
@tracer.capture_method
def post_q(i: int) -> dict:
try:
metrics.add_metric(name="PostMessageFunction", unit=MetricUnit.Count, value=1)
queue_url = 'your queue url here'
request = MySqsRequest(I=i)
message = request.model_dump_json()
send_sqs_message(queue_url, message)
return { 'response_code': 200 }
except ValueError as ve:
msg = str(ve)
logger.warning(f"Erreur lors du traitement de la demande : {msg}")
return {
'response_code': 400,
'message': msg
}
except Exception as e:
logger.error(f"Erreur lors du traitement de la demande : {str(e)}")
return {
'response_code': 500,
'message': 'Une erreur interne du serveur s\'est produite lors du traitement de la demande'
}
# ----------------------------------------------------------------------------------------------
def get_queue_url() -> str:
queue_name = get_queue_name()
sqs_client = boto3.client("sqs")
response = sqs_client.get_queue_url(
QueueName=queue_name,
)
return response["QueueUrl"]
# ----------------------------------------------------------------------------------------------
def send_sqs_message(queue_url, message):
sqs_client = boto3.client("sqs")
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message),
)
# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return app.resolve(event, context)
Déclencheur Lambda SQS : traiter le message provenant de SQS
Ajoutez la description du gestionnaire dans template.yaml
PostCompleteFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app-sqs.lambda_handler
CodeUri: test
MemorySize: 1024
Description: PostCompleteFunction function
Runtime: python3.12
Architectures:
- x86_64
Tracing: Active
Events:
SQSQueueEstimateEvent:
Type: SQS
Properties:
Queue: !Ref MySQSQueueArn
Layers:
- !Ref ApiSharedLayer
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
POWERTOOLS_METRICS_NAMESPACE: Powertools
LOG_LEVEL: INFO
Tags:
LambdaPowertools: python
Policies:
- SQSPollerPolicy:
QueueName: !Ref MySQSQueue
et le code Python dans app-sqs.py :
tracer = Tracer()
logger = Logger()
metrics = Metrics(namespace="Powertools")
# ----------------------------------------------------------------------------------------------
processor = BatchProcessor(event_type=EventType.SQS)
# ----------------------------------------------------------------------------------------------
@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.json_body
req = TypeAdapter(MySqsRequest).validate_json(payload)
logger.info(f"I: {req.I}")
# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context,
)
Déploiement
sam validate && \
sam build --use-container && \
sam deploy
Appel d’exemple
configurez $token et $api variables d’environnement puis faites un appel :
curl -v -H "Authorization: Bearer $token" -X POST ${api}/q/6 -d @test-curl/q_src.json | jq .
vous devriez voir quelque chose comme
{
"response_code": 200
}
Vérification des logs :
sam logs -n PostCompleteFunction --tail
vous devriez voir quelque chose proche de
2025/01/16/[$LATEST]5168b560c9eb4ca4871e0ed3e7f0c20d 2025-01-16T02:15:13.842000 {
"level": "INFO",
"location": "record_handler:585",
"message": "I: 6",
"timestamp": "2025-01-16 02:15:13,842+0000",
"service": "PowertoolsHelloWorld",
"cold_start": true,
"function_name": "asdasdads-PostCompleteFunction-qwe123",
"function_memory_size": "1024",
"function_arn": "arn:aws:lambda:ap-southeast-2:123123123:function:asdasdads-PostCompleteFunction-qwe123",
"function_request_id": "f7f469da-5f33-5087-8613-09d832dcedd7",
"xray_trace_id": "1-67886baf-8fefc4f2eb65405758c4d008"
}
C’est bien !