Создание AWS Lambda с двойным режимом работы на Python и Terraform

Пошаговый пример

Содержимое страницы

Здесь представлен пример Python Lambda для обработки сообщений SQS (Пример Python Lambda для обработки сообщений SQS + REST API) с защитой API Key + скрипт Terraform для развертывания его для безсерверного выполнения.

AWS Lambda позволяет создавать легковесные безсерверные функции, которые могут реагировать почти на любое событие — от сообщений SQS до HTTP-запросов. В этом руководстве мы создадим один Python Lambda, который работает в двух режимах:

  1. Режим SQS: При срабатывании на сообщение SQS, например { "par": 10 }, он публикует { "res": 11 } в другую очередь.
  2. HTTP-режим: При вызове через API Gateway по адресу GET /lam?par=10, он возвращает { "res": 11 } клиенту.

Мы также защитим HTTP-конец с помощью простого жесткозаданного API ключа — "testkey". Вся настройка будет развернута с помощью Terraform.

Обзор архитектуры

Давайте визуализируем, что мы создаем:

python lambda on aws sqs with api diagram

Та же Lambda реагирует на оба:

  • События SQS, через отображение источника событий, и
  • Запросы API Gateway, через HTTP-интеграцию REST.

Шаг 1: Создание Lambda на Python

Давайте создадим очень простой обработчик на Python, который может различать событие SQS и HTTP API вызов.

Файл: lambda_function.py

import json
import os
import boto3

sqs = boto3.client("sqs")

OUTPUT_QUEUE_URL = os.environ.get("OUTPUT_QUEUE_URL")
API_KEY = os.environ.get("API_KEY", "testkey")  # жесткозаданное значение по умолчанию

def lambda_handler(event, context):
    # Определение типа события
    if "Records" in event:  # событие SQS
        return handle_sqs(event["Records"])
    else:                   # HTTP событие
        return handle_http(event)

def handle_sqs(records):
    for record in records:
        body = json.loads(record["body"])
        par = int(body["par"])
        res = par + 1
        message = json.dumps({"res": res})
        sqs.send_message(QueueUrl=OUTPUT_QUEUE_URL, MessageBody=message)
    return {"status": "processed", "count": len(records)}

def handle_http(event):
    headers = {k.lower(): v for k, v in (event.get("headers") or {}).items()}
    if headers.get("x-api-key") != API_KEY:
        return {
            "statusCode": 403,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Forbidden"})
        }

    params = event.get("queryStringParameters") or {}
    if "par" not in params:
        return {
            "statusCode": 400,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Missing par"})
        }

    par = int(params["par"])
    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({"res": par + 1})
    }

Что у нас есть в этой функции Lambda:

  • Сообщения SQS разбираются как JSON.
  • При вызове через API Gateway функция проверяет API ключ и параметр запроса.
  • URL выходной очереди и API ключ передаются через переменные окружения.

Шаг 2: Развертывание с помощью Terraform

Terraform позволяет нам декларативно настраивать инфраструктуру AWS — Lambda, очереди SQS, API Gateway и разрешения — за один раз.

Структура проекта:

project/
├── lambda/
│   └── lambda_function.py
└── infra/
    └── main.tf

Конфигурация Terraform (infra/main.tf)

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    archive = {
      source  = "hashicorp/archive"
    }
  }
}

provider "aws" {
  region = "us-east-1"
}

locals {
  project = "lambda-sqs-api"
}

# Упаковка Lambda
data "archive_file" "lambda_zip" {
  type        = "zip"
  source_dir  = "../lambda"
  output_path = "lambda.zip"
}

# Очереди SQS
resource "aws_sqs_queue" "input" {
  name = "${local.project}-input"
}

resource "aws_sqs_queue" "output" {
  name = "${local.project}-output"
}

# IAM Роль для Lambda
data "aws_iam_policy_document" "assume_role" {
  statement {
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "lambda_role" {
  name               = "${local.project}-role"
  assume_role_policy = data.aws_iam_policy_document.assume_role.json
}

resource "aws_iam_policy" "lambda_policy" {
  name = "${local.project}-policy"
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "sqs:SendMessage",
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

# Функция Lambda
resource "aws_lambda_function" "func" {
  filename         = data.archive_file.lambda_zip.output_path
  function_name    = local.project
  role             = aws_iam_role.lambda_role.arn
  handler          = "lambda_function.lambda_handler"
  runtime          = "python3.12"
  environment {
    variables = {
      OUTPUT_QUEUE_URL = aws_sqs_queue.output.id
      API_KEY          = "testkey"
    }
  }
}

# Отображение источника событий (SQS → Lambda)
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn  = aws_sqs_queue.input.arn
  function_name     = aws_lambda_function.func.arn
  batch_size        = 1
  enabled           = true
}

# API Gateway
resource "aws_api_gateway_rest_api" "api" {
  name = "${local.project}-api"
}

resource "aws_api_gateway_resource" "lam" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  parent_id   = aws_api_gateway_rest_api.api.root_resource_id
  path_part   = "lam"
}

resource "aws_api_gateway_method" "get_lam" {
  rest_api_id   = aws_api_gateway_rest_api.api.id
  resource_id   = aws_api_gateway_resource.lam.id
  http_method   = "GET"
  authorization = "NONE"
  api_key_required = true
}

resource "aws_api_gateway_integration" "lambda_integration" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  resource_id = aws_api_gateway_resource.lam.id
  http_method = aws_api_gateway_method.get_lam.http_method

  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = aws_lambda_function.func.invoke_arn
}

resource "aws_lambda_permission" "api_gateway" {
  statement_id  = "AllowAPIGatewayInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.func.function_name
  principal     = "apigateway.amazonaws.com"
  source_arn    = "${aws_api_gateway_rest_api.api.execution_arn}/*/*"
}

# API Key и план использования
resource "aws_api_gateway_api_key" "key" {
  name    = "testkey"
  value   = "testkey"
  enabled = true
}

resource "aws_api_gateway_usage_plan" "plan" {
  name = "basic"
  api_stages {
    api_id = aws_api_gateway_rest_api.api.id
    stage  = aws_api_gateway_deployment.deploy.stage_name
  }
}

resource "aws_api_gateway_usage_plan_key" "plan_key" {
  key_id        = aws_api_gateway_api_key.key.id
  key_type      = "API_KEY"
  usage_plan_id = aws_api_gateway_usage_plan.plan.id
}

resource "aws_api_gateway_deployment" "deploy" {
  depends_on = [aws_api_gateway_integration.lambda_integration]
  rest_api_id = aws_api_gateway_rest_api.api.id
  stage_name  = "v1"
}

output "api_url" {
  value = "${aws_api_gateway_deployment.deploy.invoke_url}/lam"
}

Шаг 3: Развертывание и тестирование

  1. Инициализация Terraform:
cd infra
terraform init
  1. Применение конфигурации:
terraform apply
  1. Тестирование конечной точки API Gateway:
curl -H "x-api-key: testkey" "<API_URL>?par=10"
# Ожидается получить: {"res": 11}
  1. Тестирование SQS:

Отправка сообщения в входную очередь:

aws sqs send-message --queue-url <input-queue-url> --message-body '{"par": 5}'

Затем проверка выходной очереди:

aws sqs receive-message --queue-url <output-queue-url>
# Ожидается получить: {"res": 6}

Шаг 4: Очистка

Для удаления всех ресурсов:

terraform destroy

Итог

[Входная очередь SQS] ─▶ [Функция Lambda] ─▶ [Выходная очередь SQS]
                           ▲
                           │
                 [API Gateway /lam?par=N]
                           │
                     Защищено API Key

Вы только что создали Lambda с несколькими триггерами, которая:

  • Потребляет из и публикует в очереди SQS.
  • Отвечает на HTTP-запросы через API Gateway.
  • Обеспечивает проверку API ключа с помощью простой проверки заголовка.
  • Полностью управляется через Terraform для воспроизводимой безсерверной инфраструктуры.

Пальцы вверх!

Эта схема отлично подходит для легковесных трансформаторов сообщений, гибридных микросервисов или соединения асинхронных и синхронных систем AWS — всего несколькими строками Python и Terraform.

Если вы хотите увидеть немного более продвинутый пример Lambda с использованием AWS SAM - пожалуйста, посмотрите этот пост: Кодирование Lambda с использованием AWS SAM + AWS SQS + Python PowerTools

Полезные ссылки