使用 Python 和 Terraform 构建双模式 AWS Lambda
分步示例
目录
在这里,我们有一个 Python Lambda 示例:SQS 消息处理器 + REST API(带 API 密钥保护)+ Terraform 脚本,用于部署无服务器执行。
AWS Lambda 让您编写轻量级的无服务器函数,这些函数可以响应几乎所有事件 —— 从 SQS 消息到 HTTP 请求。 在本指南中,我们将构建一个 单一的 Python Lambda,它可以在 两种模式 下运行:
- SQS 模式: 当由类似
{ "par": 10 }
的 SQS 消息触发时, 它会将{ "res": 11 }
发布到另一个队列。 - HTTP 模式: 当通过 API Gateway 调用
GET /lam?par=10
时, 它会将{ "res": 11 }
返回给客户端。
我们还将使用一个简单的硬编码 API 密钥 —— "testkey"
来保护 HTTP 端点。
整个设置将使用 Terraform 部署。
架构概述
让我们可视化我们正在构建的内容:
相同的 Lambda 会响应:
- SQS 事件,通过事件源映射,以及
- API Gateway 请求,通过 RESTful HTTP 集成。
第一步:在 Python 中创建 Lambda
让我们创建一个非常简单的 Python 处理器,它可以区分 SQS 事件和 HTTP API 调用。
文件: lambda_function.py
import json
import os
import boto3
sqs = boto3.client("sqs")
OUTPUT_QUEUE_URL = os.environ.get("OUTPUT_QUEUE_URL")
API_KEY = os.environ.get("API_KEY", "testkey") # 硬编码默认值
def lambda_handler(event, context):
# 检测事件类型
if "Records" in event: # SQS 事件
return handle_sqs(event["Records"])
else: # HTTP 事件
return handle_http(event)
def handle_sqs(records):
for record in records:
body = json.loads(record["body"])
par = int(body["par"])
res = par + 1
message = json.dumps({"res": res})
sqs.send_message(QueueUrl=OUTPUT_QUEUE_URL, MessageBody=message)
return {"status": "processed", "count": len(records)}
def handle_http(event):
headers = {k.lower(): v for k, v in (event.get("headers") or {}).items()}
if headers.get("x-api-key") != API_KEY:
return {
"statusCode": 403,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"error": "Forbidden"})
}
params = event.get("queryStringParameters") or {}
if "par" not in params:
return {
"statusCode": 400,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"error": "Missing par"})
}
par = int(params["par"])
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"res": par + 1})
}
在这个 Lambda 函数中,我们有:
- SQS 消息以 JSON 格式解析。
- 当由 API Gateway 触发时,该函数会验证 API 密钥和查询参数。
- 输出队列 URL 和 API 密钥通过环境变量传递。
第二步:使用 Terraform 部署
Terraform 让我们以声明式方式一次性设置 AWS 基础设施 —— Lambda、SQS 队列、API Gateway 和权限。
项目结构:
project/
├── lambda/
│ └── lambda_function.py
└── infra/
└── main.tf
Terraform 配置 (infra/main.tf
)
terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
archive = {
source = "hashicorp/archive"
}
}
}
provider "aws" {
region = "us-east-1"
}
locals {
project = "lambda-sqs-api"
}
# 打包 Lambda
data "archive_file" "lambda_zip" {
type = "zip"
source_dir = "../lambda"
output_path = "lambda.zip"
}
# SQS 队列
resource "aws_sqs_queue" "input" {
name = "${local.project}-input"
}
resource "aws_sqs_queue" "output" {
name = "${local.project}-output"
}
# Lambda 的 IAM 角色
data "aws_iam_policy_document" "assume_role" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["lambda.amazonaws.com"]
}
}
}
resource "aws_iam_role" "lambda_role" {
name = "${local.project}-role"
assume_role_policy = data.aws_iam_policy_document.assume_role.json
}
resource "aws_iam_policy" "lambda_policy" {
name = "${local.project}-policy"
policy = jsonencode({
Version = "2012-10-17"
Statement = [
{
Effect = "Allow"
Action = [
"sqs:SendMessage",
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
]
Resource = "*"
},
{
Effect = "Allow"
Action = [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
]
Resource = "*"
}
]
})
}
resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
role = aws_iam_role.lambda_role.name
policy_arn = aws_iam_policy.lambda_policy.arn
}
# Lambda 函数
resource "aws_lambda_function" "func" {
filename = data.archive_file.lambda_zip.output_path
function_name = local.project
role = aws_iam_role.lambda_role.arn
handler = "lambda_function.lambda_handler"
runtime = "python3.12"
environment {
variables = {
OUTPUT_QUEUE_URL = aws_sqs_queue.output.id
API_KEY = "testkey"
}
}
}
# 事件源映射(SQS → Lambda)
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
event_source_arn = aws_sqs_queue.input.arn
function_name = aws_lambda_function.func.arn
batch_size = 1
enabled = true
}
# API Gateway
resource "aws_api_gateway_rest_api" "api" {
name = "${local.project}-api"
}
resource "aws_api_gateway_resource" "lam" {
rest_api_id = aws_api_gateway_rest_api.api.id
parent_id = aws_api_gateway_rest_api.api.root_resource_id
path_part = "lam"
}
resource "aws_api_gateway_method" "get_lam" {
rest_api_id = aws_api_gateway_rest_api.api.id
resource_id = aws_api_gateway_resource.lam.id
http_method = "GET"
authorization = "NONE"
api_key_required = true
}
resource "aws_api_gateway_integration" "lambda_integration" {
rest_api_id = aws_api_gateway_rest_api.api.id
resource_id = aws_api_gateway_resource.lam.id
http_method = aws_api_gateway_method.get_lam.http_method
integration_http_method = "POST"
type = "AWS_PROXY"
uri = aws_lambda_function.func.invoke_arn
}
resource "aws_lambda_permission" "api_gateway" {
statement_id = "AllowAPIGatewayInvoke"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.func.function_name
principal = "apigateway.amazonaws.com"
source_arn = "${aws_api_gateway_rest_api.api.execution_arn}/*/*"
}
# API 密钥和使用计划
resource "aws_api_gateway_api_key" "key" {
name = "testkey"
value = "testkey"
enabled = true
}
resource "aws_api_gateway_usage_plan" "plan" {
name = "basic"
api_stages {
api_id = aws_api_gateway_rest_api.api.id
stage = aws_api_gateway_deployment.deploy.stage_name
}
}
resource "aws_api_gateway_usage_plan_key" "plan_key" {
key_id = aws_api_gateway_api_key.key.id
key_type = "API_KEY"
usage_plan_id = aws_api_gateway_usage_plan.plan.id
}
resource "aws_api_gateway_deployment" "deploy" {
depends_on = [aws_api_gateway_integration.lambda_integration]
rest_api_id = aws_api_gateway_rest_api.api.id
stage_name = "v1"
}
output "api_url" {
value = "${aws_api_gateway_deployment.deploy.invoke_url}/lam"
}
第三步:部署和测试
- 初始化 Terraform:
cd infra
terraform init
- 应用配置:
terraform apply
- 测试 API Gateway 端点:
curl -H "x-api-key: testkey" "<API_URL>?par=10"
# 预期接收:{"res": 11}
- 测试 SQS:
向输入队列发送消息:
aws sqs send-message --queue-url <input-queue-url> --message-body '{"par": 5}'
然后检查输出队列:
aws sqs receive-message --queue-url <output-queue-url>
# 预期接收:{"res": 6}
第四步:清理
要删除所有资源:
terraform destroy
总结
[SQS 输入队列] ─▶ [Lambda 函数] ─▶ [SQS 输出队列]
▲
│
[API Gateway /lam?par=N]
│
通过 API 密钥保护
您刚刚构建了一个 多触发器 Lambda,它:
- 从 SQS 队列中消费并发布到 SQS 队列。
- 通过 API Gateway 响应 HTTP 请求。
- 使用简单的头部检查强制执行 API 密钥。
- 通过 Terraform 完全管理,实现可重复的 无服务器 基础设施。
这种模式非常适合轻量级消息转换器、混合微服务,或连接异步和同步 AWS 系统 —— 仅需几行 Python 和 Terraform。
如果您想了解更高级的 Lambda 示例,使用 AWS SAM —— 请查看这篇帖子:使用 AWS SAM + AWS SQS + Python PowerTools 编写 Lambda