Aufbau einer Dual-Mode AWS Lambda mit Python und Terraform

Schritt-für-Schritt-Beispiel

Inhaltsverzeichnis

Hier haben wir ein Python Lambda-Beispiel eines SQS-Nachrichtenprozessors + REST-API mit API-Schlüssel-Schutz + Terraform-Skript zur Bereitstellung für serverlose Ausführung.

AWS Lambda ermöglicht es Ihnen, leichte serverlose Funktionen zu schreiben, die auf fast jedes Ereignis reagieren können – von SQS-Nachrichten bis hin zu HTTP-Anfragen. In diesem Leitfaden werden wir eine einzelne Python-Lambda-Funktion erstellen, die in zwei Modi funktioniert:

  1. SQS-Modus: Wenn sie durch eine SQS-Nachricht wie { "par": 10 }, ausgelöst wird, veröffentlicht sie { "res": 11 } in eine andere Warteschlange.
  2. HTTP-Modus: Wenn sie über API Gateway bei GET /lam?par=10, aufgerufen wird, gibt sie { "res": 11 } an den Client zurück.

Wir werden auch das HTTP-Endpoint mit einem einfachen hartcodierten API-Schlüssel – "testkey" – schützen. Die gesamte Einrichtung wird mit Terraform bereitgestellt.

Architekturübersicht

Lassen Sie uns visualisieren, was wir aufbauen:

python lambda on aws sqs with api diagram

Die gleiche Lambda reagiert auf beide:

  • SQS-Ereignisse, über eine Ereignisquellenabbildung, und
  • API-Gateway-Anfragen, über eine RESTful-HTTP-Integration.

Schritt 1: Erstellen Sie die Lambda in Python

Lassen Sie uns einen sehr einfachen Handler in Python erstellen, der zwischen einem SQS-Ereignis und einem HTTP-API-Aufruf unterscheiden kann.

Datei: lambda_function.py

import json
import os
import boto3

sqs = boto3.client("sqs")

OUTPUT_QUEUE_URL = os.environ.get("OUTPUT_QUEUE_URL")
API_KEY = os.environ.get("API_KEY", "testkey")  # hartcodierter Standardwert

def lambda_handler(event, context):
    # Ereignistyp erkennen
    if "Records" in event:  # SQS-Ereignis
        return handle_sqs(event["Records"])
    else:                   # HTTP-Ereignis
        return handle_http(event)

def handle_sqs(records):
    for record in records:
        body = json.loads(record["body"])
        par = int(body["par"])
        res = par + 1
        message = json.dumps({"res": res})
        sqs.send_message(QueueUrl=OUTPUT_QUEUE_URL, MessageBody=message)
    return {"status": "processed", "count": len(records)}

def handle_http(event):
    headers = {k.lower(): v for k, v in (event.get("headers") or {}).items()}
    if headers.get("x-api-key") != API_KEY:
        return {
            "statusCode": 403,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Forbidden"})
        }

    params = event.get("queryStringParameters") or {}
    if "par" not in params:
        return {
            "statusCode": 400,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Missing par"})
        }

    par = int(params["par"])
    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({"res": par + 1})
    }

Was wir in dieser Lambda-Funktion haben:

  • SQS-Nachrichten werden als JSON analysiert.
  • Wenn sie durch API Gateway ausgelöst wird, validiert die Funktion den API-Schlüssel und den Anfrageparameter.
  • Die URL der Ausgangswarteschlange und der API-Schlüssel werden über Umgebungsvariablen übergeben.

Schritt 2: Bereitstellung mit Terraform

Terraform ermöglicht es uns, AWS-Infrastruktur – Lambda, SQS-Warteschlangen, API Gateway und Berechtigungen – deklarativ in einem Schritt einzurichten.

Projektstruktur:

project/
├── lambda/
│   └── lambda_function.py
└── infra/
    └── main.tf

Terraform-Konfiguration (infra/main.tf)

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    archive = {
      source  = "hashicorp/archive"
    }
  }
}

provider "aws" {
  region = "us-east-1"
}

locals {
  project = "lambda-sqs-api"
}

# Lambda paketieren
data "archive_file" "lambda_zip" {
  type        = "zip"
  source_dir  = "../lambda"
  output_path = "lambda.zip"
}

# SQS-Warteschlangen
resource "aws_sqs_queue" "input" {
  name = "${local.project}-input"
}

resource "aws_sqs_queue" "output" {
  name = "${local.project}-output"
}

# IAM-Rolle für Lambda
data "aws_iam_policy_document" "assume_role" {
  statement {
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "lambda_role" {
  name               = "${local.project}-role"
  assume_role_policy = data.aws_iam_policy_document.assume_role.json
}

resource "aws_iam_policy" "lambda_policy" {
  name = "${local.project}-policy"
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "sqs:SendMessage",
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

# Lambda-Funktion
resource "aws_lambda_function" "func" {
  filename         = data.archive_file.lambda_zip.output_path
  function_name    = local.project
  role             = aws_iam_role.lambda_role.arn
  handler          = "lambda_function.lambda_handler"
  runtime          = "python3.12"
  environment {
    variables = {
      OUTPUT_QUEUE_URL = aws_sqs_queue.output.id
      API_KEY          = "testkey"
    }
  }
}

# Ereignisquellenabbildung (SQS → Lambda)
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn  = aws_sqs_queue.input.arn
  function_name     = aws_lambda_function.func.arn
  batch_size        = 1
  enabled           = true
}

# API Gateway
resource "aws_api_gateway_rest_api" "api" {
  name = "${local.project}-api"
}

resource "aws_api_gateway_resource" "lam" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  parent_id   = aws_api_gateway_rest_api.api.root_resource_id
  path_part   = "lam"
}

resource "aws_api_gateway_method" "get_lam" {
  rest_api_id   = aws_api_gateway_rest_api.api.id
  resource_id   = aws_api_gateway_resource.lam.id
  http_method   = "GET"
  authorization = "NONE"
  api_key_required = true
}

resource "aws_api_gateway_integration" "lambda_integration" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  resource_id = aws_api_gateway_resource.lam.id
  http_method = aws_api_gateway_method.get_lam.http_method

  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = aws_lambda_function.func.invoke_arn
}

resource "aws_lambda_permission" "api_gateway" {
  statement_id  = "AllowAPIGatewayInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.func.function_name
  principal     = "apigateway.amazonaws.com"
  source_arn    = "${aws_api_gateway_rest_api.api.execution_arn}/*/*"
}

# API-Schlüssel und Nutzungsplan
resource "aws_api_gateway_api_key" "key" {
  name    = "testkey"
  value   = "testkey"
  enabled = true
}

resource "aws_api_gateway_usage_plan" "plan" {
  name = "basic"
  api_stages {
    api_id = aws_api_gateway_rest_api.api.id
    stage  = aws_api_gateway_deployment.deploy.stage_name
  }
}

resource "aws_api_gateway_usage_plan_key" "plan_key" {
  key_id        = aws_api_gateway_api_key.key.id
  key_type      = "API_KEY"
  usage_plan_id = aws_api_gateway_usage_plan.plan.id
}

resource "aws_api_gateway_deployment" "deploy" {
  depends_on = [aws_api_gateway_integration.lambda_integration]
  rest_api_id = aws_api_gateway_rest_api.api.id
  stage_name  = "v1"
}

output "api_url" {
  value = "${aws_api_gateway_deployment.deploy.invoke_url}/lam"
}

Schritt 3: Bereitstellung und Test

  1. Terraform initialisieren:
cd infra
terraform init
  1. Konfiguration anwenden:
terraform apply
  1. API-Gateway-Endpoint testen:
curl -H "x-api-key: testkey" "<API_URL>?par=10"
# Erwartet: {"res": 11}
  1. SQS testen:

Senden Sie eine Nachricht an die Eingabewarteschlange:

aws sqs send-message --queue-url <input-queue-url> --message-body '{"par": 5}'

Dann überprüfen Sie die Ausgabewarteschlange:

aws sqs receive-message --queue-url <output-queue-url>
# Erwartet: {"res": 6}

Schritt 4: Aufräumen

Um alle Ressourcen zu entfernen:

terraform destroy

Zusammenfassung

[SQS-Eingabewarteschlange] ─▶ [Lambda-Funktion] ─▶ [SQS-Ausgabewarteschlange]
                           ▲
                           │
                 [API Gateway /lam?par=N]
                           │
                     Gesichert durch API-Schlüssel

Sie haben gerade eine Multi-Trigger-Lambda erstellt, die:

  • Nachrichten aus SQS-Warteschlangen konsumiert und in diese veröffentlicht.
  • Auf HTTP-Anfragen über API Gateway reagiert.
  • Einen API-Schlüssel mit einer einfachen Header-Prüfung durchsetzt.
  • Vollständig über Terraform für reproduzierbare serverlose Infrastruktur verwaltet wird.

Thumbs up!

Dieses Muster eignet sich hervorragend für leichte Nachrichtenwandler, hybride Mikrodienste oder die Verbindung asynchroner und synchroner AWS-Systeme – alles mit einigen Zeilen Python und Terraform.

Wenn Sie ein etwas fortgeschritteneres Lambda-Beispiel mit AWS SAM sehen möchten, besuchen Sie bitte diesen Beitrag: Coding Lambda mit AWS SAM + AWS SQS + Python PowerTools