AWS SAM + AWS SQS + Python PowerTools

Пример реализации

Содержимое страницы

Вот шаги, необходимые для реализации пары [функций lambda в AWS SAM](https://www.glukhov.org/ru/post/2025/01/aws-sam-lambda-python-powertools-sqs/ “функций lambda в AWS SAM с SQS):

  1. REST API Lambda: отправить сообщение в SQS
  2. Триггер Lambda для SQS: обработать сообщение из SQS

Если вас интересно что такое AWS SAM и как начать - проверьте этот документ: Слоистые Lambda-функции с AWS SAM и Python

ряд ящиков почты

Предварительная настройка

Реализуйте REST-API с AWS SAM и python, например как описано в Слоистые Lambda-функции с AWS SAM и Python

REST API Lambda: отправить сообщение в SQS

Опишите очередь в template.yaml

    MySQSQueue:
        Type: String
        Default: "TheQueue"
    MySQSQueueArn:
        Type: String
        Default: "arn:aws:sqs:ap-southeast-2:123somenumbers123:TheQueue"

Добавьте в template.yaml функцию API, которая будет отправлять сообщение в AWS SQS.

      PostMessageFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostMessageFunction function
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                PostProjectEstimatePath:
                    Type: Api
                    Properties:
                        Path: /q/{i}
                        Method: POST
                        RestApiId: !Ref MyApi
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSSendMessagePolicy:
                    QueueName: !Ref MySQSQueue

Реализуйте класс сообщения

from pydantic import BaseModel, Field
from typing import List, Optional

class MySqsRequest(BaseModel):
    I: Optional[int] = Field(default=None)

Реализуйте python-код для отправки сообщения в очередь в app.py

@app.post("/q/<i>")
@tracer.capture_method
def post_q(i: int) -> dict:
    try:
        metrics.add_metric(name="PostMessageFunction", unit=MetricUnit.Count, value=1)

        queue_url = 'your queue url here'
        request = MySqsRequest(I=i)
        message = request.model_dump_json()
        send_sqs_message(queue_url, message)

        return { 'response_code': 200 }
    
    except ValueError as ve:
        msg = str(ve)
        logger.warning(f"Error processing prequest: {msg}")
        return {
            'response_code': 400,
            'message': msg
        }
    except Exception as e:
        logger.error(f"Error processing  request: {str(e)}")
        return {
            'response_code': 500,
            'message': 'Внутренняя ошибка сервера произошла при обработке запроса'
        }


# ----------------------------------------------------------------------------------------------
def get_queue_url() -> str:
    queue_name = get_queue_name()
    sqs_client = boto3.client("sqs")
    response = sqs_client.get_queue_url(
        QueueName=queue_name,
    )
    return response["QueueUrl"]

# ----------------------------------------------------------------------------------------------
def send_sqs_message(queue_url, message):
    sqs_client = boto3.client("sqs")
    sqs_client.send_message(
        QueueUrl=queue_url,
        MessageBody=json.dumps(message),
    )


# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
    return app.resolve(event, context)

Триггер Lambda для SQS: обработать сообщение из SQS

Добавьте описание обработчика в template.yaml

      PostCompleteFunction:
        Type: AWS::Serverless::Function
        Properties:
            Handler: app-sqs.lambda_handler
            CodeUri: test
            MemorySize: 1024
            Description: PostCompleteFunction function
            Runtime: python3.12
            Architectures:
                - x86_64
            Tracing: Active
            Events:
                SQSQueueEstimateEvent:
                    Type: SQS
                    Properties:
                        Queue: !Ref MySQSQueueArn
            Layers:
                - !Ref ApiSharedLayer
            Environment:
                Variables:
                    POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
                    POWERTOOLS_METRICS_NAMESPACE: Powertools
                    LOG_LEVEL: INFO
            Tags:
                LambdaPowertools: python
            Policies:
                - SQSPollerPolicy:
                    QueueName: !Ref MySQSQueue

и python-код в app-sqs.py:

tracer = Tracer()
logger = Logger()
metrics = Metrics(namespace="Powertools")

# ----------------------------------------------------------------------------------------------
processor = BatchProcessor(event_type=EventType.SQS)  

# ----------------------------------------------------------------------------------------------
@tracer.capture_method
def record_handler(record: SQSRecord):  
    payload: str = record.json_body 
    req = TypeAdapter(MySqsRequest).validate_json(payload)
    logger.info(f"I: {req.I}")


# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(  
        event=event,
        record_handler=record_handler,
        processor=processor,
        context=context,
    )

Развертывание

sam validate && \
sam build --use-container && \
sam deploy

Пример вызова

Настройте переменные окружения $token и $api, затем выполните вызов:

curl -v -H "Authorization: Bearer $token" -X POST ${api}/q/6 -d @test-curl/q_src.json | jq . 

вы должны увидеть что-то вроде

{
  "response_code": 200
}

Проверка логов:

sam logs -n PostCompleteFunction --tail

вы должны увидеть что-то похожее на

2025/01/16/[$LATEST]5168b560c9eb4ca4871e0ed3e7f0c20d 2025-01-16T02:15:13.842000 {
  "level": "INFO",
  "location": "record_handler:585",
  "message": "I: 6",
  "timestamp": "2025-01-16 02:15:13,842+0000",
  "service": "PowertoolsHelloWorld",
  "cold_start": true,
  "function_name": "asdasdads-PostCompleteFunction-qwe123",
  "function_memory_size": "1024",
  "function_arn": "arn:aws:lambda:ap-southeast-2:123123123:function:asdasdads-PostCompleteFunction-qwe123",
  "function_request_id": "f7f469da-5f33-5087-8613-09d832dcedd7",
  "xray_trace_id": "1-67886baf-8fefc4f2eb65405758c4d008"
}

Отлично!

Полезные ссылки