Serverless systeemontwikkeling met behulp van AWS SAM, AWS SQS en Python PowerTools
Voorbeeld van implementatie
Hier geef ik een overzicht van de stappen die nodig zijn om een paar lambda functies in AWS SAM te implementeren:
- REST API Lambda: bericht verzenden naar SQS
- SQS Lambda Trigger: bericht verwerken vanaf SQS
Als je nieuwsgierig bent wat AWS SAM is en hoe je ermee kunt beginnen - bekijk dan deze documentatie: Laaggestructureerde Lambdas met AWS SAM en Python
Vooraf
Implementeer een REST-API met AWS SAM en Python, bijvoorbeeld zoals beschreven in Laaggestructureerde Lambdas met AWS SAM en Python
REST API Lambda: bericht verzenden naar SQS
Beschrijf de wachtrij in template.yaml
MySQSQueue:
Type: String
Default: "TheQueue"
MySQSQueueArn:
Type: String
Default: "arn:aws:sqs:ap-southeast-2:123somenumbers123:TheQueue"
Voeg in template.yaml de API functie toe die het bericht naar AWS SQS zal sturen.
PostMessageFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.lambda_handler
CodeUri: test
MemorySize: 1024
Description: PostMessageFunction functie
Runtime: python3.12
Architectures:
- x86_64
Tracing: Active
Events:
PostProjectEstimatePath:
Type: Api
Properties:
Path: /q/{i}
Method: POST
RestApiId: !Ref MyApi
Layers:
- !Ref ApiSharedLayer
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
POWERTOOLS_METRICS_NAMESPACE: Powertools
LOG_LEVEL: INFO
Tags:
LambdaPowertools: python
Policies:
- SQSSendMessagePolicy:
QueueName: !Ref MySQSQueue
Implementeer de berichtklasse
from pydantic import BaseModel, Field
from typing import List, Optional
class MySqsRequest(BaseModel):
I: Optional[int] = Field(default=None)
Implementeer de Python code om het bericht naar de wachtrij te sturen in app.py
@app.post("/q/<i>")
@tracer.capture_method
def post_q(i: int) -> dict:
try:
metrics.add_metric(name="PostMessageFunction", unit=MetricUnit.Count, value=1)
queue_url = 'your queue url here'
request = MySqsRequest(I=i)
message = request.model_dump_json()
send_sqs_message(queue_url, message)
return { 'response_code': 200 }
except ValueError as ve:
msg = str(ve)
logger.warning(f"Fout bij het verwerken van de aanvraag: {msg}")
return {
'response_code': 400,
'message': msg
}
except Exception as e:
logger.error(f"Fout bij het verwerken van de aanvraag: {str(e)}")
return {
'response_code': 500,
'message': 'Interne serverfout is opgetreden tijdens het verwerken van de aanvraag'
}
# ----------------------------------------------------------------------------------------------
def get_queue_url() -> str:
queue_name = get_queue_name()
sqs_client = boto3.client("sqs")
response = sqs_client.get_queue_url(
QueueName=queue_name,
)
return response["QueueUrl"]
# ----------------------------------------------------------------------------------------------
def send_sqs_message(queue_url, message):
sqs_client = boto3.client("sqs")
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message),
)
# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return app.resolve(event, context)
SQS Lambda Trigger: bericht verwerken vanaf SQS
Voeg een handler beschrijving toe in template.yaml
PostCompleteFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app-sqs.lambda_handler
CodeUri: test
MemorySize: 1024
Description: PostCompleteFunction functie
Runtime: python3.12
Architectures:
- x86_64
Tracing: Active
Events:
SQSQueueEstimateEvent:
Type: SQS
Properties:
Queue: !Ref MySQSQueueArn
Layers:
- !Ref ApiSharedLayer
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
POWERTOOLS_METRICS_NAMESPACE: Powertools
LOG_LEVEL: INFO
Tags:
LambdaPowertools: python
Policies:
- SQSPollerPolicy:
QueueName: !Ref MySQSQueue
en de Python code in app-sqs.py:
tracer = Tracer()
logger = Logger()
metrics = Metrics(namespace="Powertools")
# ----------------------------------------------------------------------------------------------
processor = BatchProcessor(event_type=EventType.SQS)
# ----------------------------------------------------------------------------------------------
@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.json_body
req = TypeAdapter(MySqsRequest).validate_json(payload)
logger.info(f"I: {req.I}")
# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context,
)
Implementatie
sam validate && \
sam build --use-container && \
sam deploy
Voorbeeld aanroep
Stel $token en $api omgevingsvariabelen in en voer een aanroep uit:
curl -v -H "Authorization: Bearer $token" -X POST ${api}/q/6 -d @test-curl/q_src.json | jq .
je zou iets moeten zien zoals
{
"response_code": 200
}
Logcheck:
sam logs -n PostCompleteFunction --tail
je zou iets moeten zien zoals
2025/01/16/[$LATEST]5168b560c9eb4ca4871e0ed3e7f0c20d 2025-01-16T02:15:13.842000 {
"level": "INFO",
"location": "record_handler:585",
"message": "I: 6",
"timestamp": "2025-01-16 02:15:13,842+0000",
"service": "PowertoolsHelloWorld",
"cold_start": true,
"function_name": "asdasdads-PostCompleteFunction-qwe123",
"function_memory_size": "1024",
"function_arn": "arn:aws:lambda:ap-southeast-2:123123123:function:asdasdads-PostCompleteFunction-qwe123",
"function_request_id": "f7f469da-5f33-5087-8613-09d832dcedd7",
"xray_trace_id": "1-67886baf-8fefc4f2eb65405758c4d008"
}
Prima!
Ik hoop dat je deze voorbeeldimplementatie van asynchrone verwerking met de AWS serverless platform nuttig hebt gevonden. Heb een fijn dagje!