PythonとTerraformを使用した双模態AWS Lambdaの構築

ステップバイステップの例

目次

ここに、Python LambdaのSQSメッセージプロセッサの例 + REST APIとAPIキー保護 + Terraformスクリプトがあり、サーバーレス実行のためにそれをデプロイします。

AWS Lambdaは、SQSメッセージからHTTPリクエストまで、ほぼすべてのイベントに反応できる軽量なサーバーレス関数を書くことができます。 このガイドでは、単一のPython Lambdaを構築し、2つのモードで動作させます:

  1. SQSモード: SQSメッセージ(例: { "par": 10 })によってトリガーされた場合、{ "res": 11 }を別のキューに公開します。
  2. HTTPモード: GET /lam?par=10でAPI Gatewayを介して呼び出された場合、{ "res": 11 }をクライアントに返します。

HTTPエンドポイントは、単純なハードコードされたAPIキー testkeyで保護されます。 全体のセットアップは、Terraformを使用してデプロイされます。


アーキテクチャ概要

構築するものを視覚化しましょう:

python lambda on aws sqs with api diagram

同じLambdaは、以下に反応します:

  • SQSイベント、イベントソースマッピングを通じて、
  • API Gatewayリクエスト、RESTful HTTP統合を通じて。

ステップ1: PythonでLambdaを作成する

SQSイベントとHTTP API呼び出しを区別できる非常に単純なハンドラーを作成しましょう。

ファイル: lambda_function.py

import json
import os
import boto3

sqs = boto3.client("sqs")

OUTPUT_QUEUE_URL = os.environ.get("OUTPUT_QUEUE_URL")
API_KEY = os.environ.get("API_KEY", "testkey")  # ハードコードされたデフォルト

def lambda_handler(event, context):
    # イベントタイプを検出
    if "Records" in event:  # SQSイベント
        return handle_sqs(event["Records"])
    else:                   # HTTPイベント
        return handle_http(event)

def handle_sqs(records):
    for record in records:
        body = json.loads(record["body"])
        par = int(body["par"])
        res = par + 1
        message = json.dumps({"res": res})
        sqs.send_message(QueueUrl=OUTPUT_QUEUE_URL, MessageBody=message)
    return {"status": "processed", "count": len(records)}

def handle_http(event):
    headers = {k.lower(): v for k, v in (event.get("headers") or {}).items()}
    if headers.get("x-api-key") != API_KEY:
        return {
            "statusCode": 403,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Forbidden"})
        }

    params = event.get("queryStringParameters") or {}
    if "par" not in params:
        return {
            "statusCode": 400,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "Missing par"})
        }

    par = int(params["par"])
    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({"res": par + 1})
    }

このLambda関数には以下があります:

  • SQSメッセージはJSONとして解析されます。
  • API Gatewayによってトリガーされた場合、関数はAPIキーとクエリパラメータを検証します。
  • 出力キューURLとAPIキーは環境変数を通じて渡されます。

ステップ2: Terraformでデプロイする

Terraformは、Lambda、SQSキュー、API Gateway、および権限を一度に宣言的に設定できるようにします。

プロジェクト構造:

project/
├── lambda/
│   └── lambda_function.py
└── infra/
    └── main.tf

Terraform設定 (infra/main.tf)

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    archive = {
      source  = "hashicorp/archive"
    }
  }
}

provider "aws" {
  region = "us-east-1"
}

locals {
  project = "lambda-sqs-api"
}

# Lambdaをパッケージ
data "archive_file" "lambda_zip" {
  type        = "zip"
  source_dir  = "../lambda"
  output_path = "lambda.zip"
}

# SQSキュー
resource "aws_sqs_queue" "input" {
  name = "${local.project}-input"
}

resource "aws_sqs_queue" "output" {
  name = "${local.project}-output"
}

# Lambda用のIAMロール
data "aws_iam_policy_document" "assume_role" {
  statement {
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "lambda_role" {
  name               = "${local.project}-role"
  assume_role_policy = data.aws_iam_policy_document.assume_role.json
}

resource "aws_iam_policy" "lambda_policy" {
  name = "${local.project}-policy"
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "sqs:SendMessage",
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

# Lambda関数
resource "aws_lambda_function" "func" {
  filename         = data.archive_file.lambda_zip.output_path
  function_name    = local.project
  role             = aws_iam_role.lambda_role.arn
  handler          = "lambda_function.lambda_handler"
  runtime          = "python3.12"
  environment {
    variables = {
      OUTPUT_QUEUE_URL = aws_sqs_queue.output.id
      API_KEY          = "testkey"
    }
  }
}

# イベントソースマッピング (SQS → Lambda)
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn  = aws_sqs_queue.input.arn
  function_name     = aws_lambda_function.func.arn
  batch_size        = 1
  enabled           = true
}

# API Gateway
resource "aws_api_gateway_rest_api" "api" {
  name = "${local.project}-api"
}

resource "aws_api_gateway_resource" "lam" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  parent_id   = aws_api_gateway_rest_api.api.root_resource_id
  path_part   = "lam"
}

resource "aws_api_gateway_method" "get_lam" {
  rest_api_id   = aws_api_gateway_rest_api.api.id
  resource_id   = aws_api_gateway_resource.lam.id
  http_method   = "GET"
  authorization = "NONE"
  api_key_required = true
}

resource "aws_api_gateway_integration" "lambda_integration" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  resource_id = aws_api_gateway_resource.lam.id
  http_method = aws_api_gateway_method.get_lam.http_method

  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = aws_lambda_function.func.invoke_arn
}

resource "aws_lambda_permission" "api_gateway" {
  statement_id  = "AllowAPIGatewayInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.func.function_name
  principal     = "apigateway.amazonaws.com"
  source_arn    = "${aws_api_gateway_rest_api.api.execution_arn}/*/*"
}

# APIキーと使用プラン
resource "aws_api_gateway_api_key" "key" {
  name    = "testkey"
  value   = "testkey"
  enabled = true
}

resource "aws_api_gateway_usage_plan" "plan" {
  name = "basic"
  api_stages {
    api_id = aws_api_gateway_rest_api.api.id
    stage  = aws_api_gateway_deployment.deploy.stage_name
  }
}

resource "aws_api_gateway_usage_plan_key" "plan_key" {
  key_id        = aws_api_gateway_api_key.key.id
  key_type      = "API_KEY"
  usage_plan_id = aws_api_gateway_usage_plan.plan.id
}

resource "aws_api_gateway_deployment" "deploy" {
  depends_on = [aws_api_gateway_integration.lambda_integration]
  rest_api_id = aws_api_gateway_rest_api.api.id
  stage_name  = "v1"
}

output "api_url" {
  value = "${aws_api_gateway_deployment.deploy.invoke_url}/lam"
}

ステップ3: デプロイとテスト

  1. Terraformを初期化:
cd infra
terraform init
  1. 設定を適用:
terraform apply
  1. API Gatewayエンドポイントをテスト:
curl -H "x-api-key: testkey" "<API_URL>?par=10"
# 予期される応答: {"res": 11}
  1. SQSをテスト:

入力キューにメッセージを送信:

aws sqs send-message --queue-url <input-queue-url> --message-body '{"par": 5}'

その後、出力キューを確認:

aws sqs receive-message --queue-url <output-queue-url>
# 予期される応答: {"res": 6}

ステップ4: クリーンアップ

すべてのリソースを削除するには:

terraform destroy

まとめ

[SQS入力キュー] ─▶ [Lambda関数] ─▶ [SQS出力キュー]
                           ▲
                           │
                 [API Gateway /lam?par=N]
                           │
                     APIキーで保護

あなたはマルチトリガーLambdaを構築しました:

  • SQSキューからメッセージを消費し、SQSキューにメッセージを公開します。
  • API Gatewayを通じてHTTPリクエストに応答します。
  • シンプルなヘッダーチェックを使用してAPIキーを強制します。
  • Terraformを使用して、再現可能なサーバーレスインフラストラクチャを完全に管理します。

Thumbs up!

このパターンは、軽量なメッセージ変換、ハイブリッドマイクロサービス、非同期と同期のAWSシステムの橋渡しに最適です — すべては数行のPythonとTerraformで実現可能です。

より高度なLambdaの例を使用してAWS SAMを確認したい場合は、この投稿をチェックしてください: AWS SAM + AWS SQS + Python PowerToolsを使用したLambdaのコーディング

有用なリンク