Tworzenie dual-modowego AWS Lambda w języku Python i Terraform

Przykład krok po kroku

Page content

Oto przykład Python Lambda do przetwarzania wiadomości SQS + interfejsu API REST z ochroną za pomocą klucza API + Terraform skryptu do wdrożenia w trybie bezserwerowym.

AWS Lambda pozwala tworzyć lekkie funkcje bezserwerowe, które mogą reagować prawie na każdy wydarzenie — od wiadomości SQS po żądania HTTP. W tym przewodniku zbudujemy jedną funkcję Python Lambda, która będzie działać w dwóch trybach:

  1. Trybie SQS: Gdy zostanie wywołana przez wiadomość SQS taką jak { "par": 10 }, opublikuje { "res": 11 } w kolejce.
  2. Trybie HTTP: Gdy zostanie wywołana przez API Gateway na GET /lam?par=10, zwróci { "res": 11 } do klienta.

Zabezpieczymy również punkt końcowy HTTP za pomocą prostego, zapisanego w kodzie klucza API — "testkey". Cała konfiguracja zostanie wdrożona przy użyciu Terraform.

Omówienie architektury

Zróbmy wizualizację tego, co budujemy:

python lambda na aws sqs z diagramem api

Ta sama funkcja Lambda reaguje na:

  • Wydarzenia SQS, za pomocą mapowania źródła wydarzeń, oraz
  • Żądania API Gateway, za pomocą integracji HTTP REST.

Krok 1: Utwórz funkcję Lambda w Pythonie

Utwórzmy bardzo prosty handler w Pythonie, który będzie potrafił odróżnić wydarzenie SQS od wywołania HTTP.

Plik: lambda_function.py

import json
import os
import boto3

sqs = boto3.client("sqs")

OUTPUT_QUEUE_URL = os.environ.get("OUTPUT_QUEUE_URL")
API_KEY = os.environ.get("API_KEY", "testkey")  # domyślnie zapisany w kodzie

def lambda_handler(event, context):
    # Wykryj typ wydarzenia
    if "Records" in event:  # wydarzenie SQS
        return handle_sqs(event["Records"])
    else:                   # wydarzenie HTTP
        return handle_http(event)

def handle_sqs(records):
    for record in records:
        body = json.loads(record["body"])
        par = int(body["par"])
        res = par + 1
        message = json.dumps({"res": res})
        sqs.send_message(QueueUrl=OUTPUT_QUEUE_URL, MessageBody=message)
    return {"status": "processed", "count": len(records)}

def handle_http(event):
    headers = {k.lower(): v for k, v in (event.get("headers") or {}).items()}
    if headers.get("x-api-key") != API_KEY:
        return {
            "statusCode": 403,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Forbidden"})
        }

    params = event.get("queryStringParameters") or {}
    if "par" not in params:
        return {
            "statusCode": 400,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Brak parametru 'par'"})
        }

    par = int(params["par"])
    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({"res": par + 1})
    }

To, co mamy w tej funkcji Lambda:

  • Wiadomości SQS są parsowane jako JSON.
  • Gdy zostanie wywołana przez API Gateway, funkcja weryfikuje klucz API i parametr zapytania.
  • URL kolejki wyjściowej i klucz API są przekazywane przez zmienne środowiskowe.

Krok 2: Wdrożenie za pomocą Terraform

Terraform pozwala deklaratywnie ustawić infrastrukturę AWS — Lambda, kolejki SQS, API Gateway i uprawnienia — w jednym kroku.

Struktura projektu:

project/
├── lambda/
│   └── lambda_function.py
└── infra/
    └── main.tf

Konfiguracja Terraform (infra/main.tf)

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    archive = {
      source  = "hashicorp/archive"
    }
  }
}

provider "aws" {
  region = "us-east-1"
}

locals {
  project = "lambda-sqs-api"
}

# Pakowanie Lambda
data "archive_file" "lambda_zip" {
  type        = "zip"
  source_dir  = "../lambda"
  output_path = "lambda.zip"
}

# Kolejki SQS
resource "aws_sqs_queue" "input" {
  name = "${local.project}-input"
}

resource "aws_sqs_queue" "output" {
  name = "${local.project}-output"
}

# Rola IAM dla Lambda
data "aws_iam_policy_document" "assume_role" {
  statement {
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "lambda_role" {
  name               = "${local.project}-role"
  assume_role_policy = data.aws_iam_policy_document.assume_role.json
}

resource "aws_iam_policy" "lambda_policy" {
  name = "${local.project}-policy"
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "sqs:SendMessage",
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

# Funkcja Lambda
resource "aws_lambda_function" "func" {
  filename         = data.archive_file.lambda_zip.output_path
  function_name    = local.project
  role             = aws_iam_role.lambda_role.arn
  handler          = "lambda_function.lambda_handler"
  runtime          = "python3.12"
  environment {
    variables = {
      OUTPUT_QUEUE_URL = aws_sqs_queue.output.id
      API_KEY          = "testkey"
    }
  }
}

# Mapowanie źródła wydarzeń (SQS → Lambda)
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn  = aws_sqs_queue.input.arn
  function_name     = aws_lambda_function.func.arn
  batch_size        = 1
  enabled           = true
}

# API Gateway
resource "aws_api_gateway_rest_api" "api" {
  name = "${local.project}-api"
}

resource "aws_api_gateway_resource" "lam" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  parent_id   = aws_api_gateway_rest_api.api.root_resource_id
  path_part   = "lam"
}

resource "aws_api_gateway_method" "get_lam" {
  rest_api_id   = aws_api_gateway_rest_api.api.id
  resource_id   = aws_api_gateway_resource.lam.id
  http_method   = "GET"
  authorization = "NONE"
  api_key_required = true
}

resource "aws_api_gateway_integration" "lambda_integration" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  resource_id = aws_api_gateway_resource.lam.id
  http_method = aws_api_gateway_method.get_lam.http_method

  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = aws_lambda_function.func.invoke_arn
}

resource "aws_lambda_permission" "api_gateway" {
  statement_id  = "AllowAPIGatewayInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.func.function_name
  principal     = "apigateway.amazonaws.com"
  source_arn    = "${aws_api_gateway_rest_api.api.execution_arn}/*/*"
}

# Klucz API i plan użycia
resource "aws_api_gateway_api_key" "key" {
  name    = "testkey"
  value   = "testkey"
  enabled = true
}

resource "aws_api_gateway_usage_plan" "plan" {
  name = "basic"
  api_stages {
    api_id = aws_api_gateway_rest_api.api.id
    stage  = aws_api_gateway_deployment.deploy.stage_name
  }
}

resource "aws_api_gateway_usage_plan_key" "plan_key" {
  key_id        = aws_api_gateway_api_key.key.id
  key_type      = "API_KEY"
  usage_plan_id = aws_api_gateway_usage_plan.plan.id
}

resource "aws_api_gateway_deployment" "deploy" {
  depends_on = [aws_api_gateway_integration.lambda_integration]
  rest_api_id = aws_api_gateway_rest_api.api.id
  stage_name  = "v1"
}

output "api_url" {
  value = "${aws_api_gateway_deployment.deploy.invoke_url}/lam"
}

Krok 3: Wdrożenie i testowanie

  1. Inicjalizacja Terraform:
cd infra
terraform init
  1. Zastosowanie konfiguracji:
terraform apply
  1. Testowanie punktu końcowego API Gateway:
curl -H "x-api-key: testkey" "<API_URL>?par=10"
# Oczekiwany wynik: {"res": 11}
  1. Testowanie SQS:

Wyślij wiadomość do kolejki wejściowej:

aws sqs send-message --queue-url <input-queue-url> --message-body '{"par": 5}'

Następnie sprawdź kolejkę wyjściową:

aws sqs receive-message --queue-url <output-queue-url>
# Oczekiwany wynik: {"res": 6}

Krok 4: Czyszczenie

Aby usunąć wszystkie zasoby:

terraform destroy

Podsumowanie

[Kolejka wejściowa SQS] ─▶ [Funkcja Lambda] ─▶ [Kolejka wyjściowa SQS]
                           ▲
                           │
                 [API Gateway /lam?par=N]
                           │
                     Zabezpieczony kluczem API

Zbudowałeś właśnie Lambda z wieloma wyzwalaczami, który:

  • Odbiera i wysyła wiadomości do kolejek SQS.
  • Odpowiada na żądania HTTP przez API Gateway.
  • Wymusza klucz API za pomocą prostego sprawdzania nagłówka.
  • Jest w pełni zarządzany przez Terraform do powtarzalnej infrastruktury bezserwerowej.

Pochwal się!

Ten wzorzec jest świetny do lekkich transformatorów wiadomości, mikroserwisów hybrydowych lub łączenia systemów asynchronicznych i synchronicznych w AWS — wszystko za pomocą kilku linii Pythona i Terraform.

Jeśli chcesz zobaczyć nieco bardziej zaawansowany przykład Lambda — sprawdź ten wpis: Kodowanie Lambda za pomocą AWS SAM + AWS SQS + Python PowerTools

Przydatne linki