بناء وظيفة AWS Lambda ثنائية النمط باستخدام Python وTerraform

مثال تدريجي خطوة بخطوة

Page content

هنا لدينا مثال لـ Lambda في Python لمعالجة رسالة SQS + واجهة برمجة تطبيقات REST مع حماية مفتاح API + Terraform نسخة قابلة للنشر لتشغيلها بدون خادم.

تتيح AWS Lambda كتابة وظائف خالية من الخادم خفيفة الوزن يمكنها التفاعل مع أغلب الأحداث - من رسائل SQS إلى طلبات HTTP. في هذا الدليل، سنقوم ببناء وظيفة Lambda واحدة في Python تعمل في وضعين:

  1. وضع SQS: عند التنشيط من خلال رسالة SQS مثل { "par": 10 }, تنشر { "res": 11 } إلى صف انتظار آخر.
  2. وضع HTTP: عند الاتصال عبر API Gateway في GET /lam?par=10, تعيد { "res": 11 } إلى العميل.

سنشمل أيضًا حماية نقطة النهاية HTTP باستخدام مفتاح API مُحدد مسبقًا - "testkey". سيتم نشر كل هذا الإعداد باستخدام Terraform.

ملخص العمارة

لنقم بتصور ما نبنيه:

python lambda على aws sqs مع رسم توضيحي لـ API

الوظيفة Lambda نفسها تتفاعل مع كلا:

  • أحداث SQS، عبر خريطة مصدر الأحداث، و
  • طلبات API Gateway، عبر تكامل HTTP RESTful.

الخطوة 1: إنشاء Lambda في Python

لنقم بإنشاء معالج بسيط جدًا في Python يمكنه التمييز بين حدث SQS وطلب HTTP.

الملف: lambda_function.py

import json
import os
import boto3

sqs = boto3.client("sqs")

OUTPUT_QUEUE_URL = os.environ.get("OUTPUT_QUEUE_URL")
API_KEY = os.environ.get("API_KEY", "testkey")  # القيمة الافتراضية المُحددة مسبقًا

def lambda_handler(event, context):
    # اكتشاف نوع الحدث
    if "Records" in event:  # حدث SQS
        return handle_sqs(event["Records"])
    else:                   # حدث HTTP
        return handle_http(event)

def handle_sqs(records):
    for record in records:
        body = json.loads(record["body"])
        par = int(body["par"])
        res = par + 1
        message = json.dumps({"res": res})
        sqs.send_message(QueueUrl=OUTPUT_QUEUE_URL, MessageBody=message)
    return {"status": "processed", "count": len(records)}

def handle_http(event):
    headers = {k.lower(): v for k, v in (event.get("headers") or {}).items()}
    if headers.get("x-api-key") != API_KEY:
        return {
            "statusCode": 403,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "غير مسموح"})
        }

    params = event.get("queryStringParameters") or {}
    if "par" not in params:
        return {
            "statusCode": 400,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"error": "معلمة par مفقودة"})
        }

    par = int(params["par"])
    return {
        "statusCode": 200,
        "headers": {"Content-Type": "application/json"},
        "body": json.dumps({"res": par + 1})
    }

ما نملكه في هذه الدالة Lambda:

  • تُحلل رسائل SQS كـ JSON.
  • عند التنشيط من خلال API Gateway، تتحقق الدالة من مفتاح API ومعلمة الاستعلام.
  • تُمرر عنوان صف الإخراج ومفتاح API عبر المتغيرات البيئية.

الخطوة 2: النشر باستخدام Terraform

يتيح لنا Terraform إعداد البنية التحتية لـ AWS بشكل إعلاني - Lambda، صفوف SQS، API Gateway، والصلاحيات - في مرة واحدة.

هيكل المشروع:

project/
├── lambda/
│   └── lambda_function.py
└── infra/
    └── main.tf

إعداد Terraform (infra/main.tf)

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 5.0"
    }
    archive = {
      source  = "hashicorp/archive"
    }
  }
}

provider "aws" {
  region = "us-east-1"
}

locals {
  project = "lambda-sqs-api"
}

# تعبئة Lambda
data "archive_file" "lambda_zip" {
  type        = "zip"
  source_dir  = "../lambda"
  output_path = "lambda.zip"
}

# صفوف SQS
resource "aws_sqs_queue" "input" {
  name = "${local.project}-input"
}

resource "aws_sqs_queue" "output" {
  name = "${local.project}-output"
}

# دور IAM لـ Lambda
data "aws_iam_policy_document" "assume_role" {
  statement {
    actions = ["sts:AssumeRole"]
    principals {
      type        = "Service"
      identifiers = ["lambda.amazonaws.com"]
    }
  }
}

resource "aws_iam_role" "lambda_role" {
  name               = "${local.project}-role"
  assume_role_policy = data.aws_iam_policy_document.assume_role.json
}

resource "aws_iam_policy" "lambda_policy" {
  name = "${local.project}-policy"
  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Effect = "Allow"
        Action = [
          "sqs:SendMessage",
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes"
        ]
        Resource = "*"
      },
      {
        Effect = "Allow"
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents"
        ]
        Resource = "*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
  role       = aws_iam_role.lambda_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

# دالة Lambda
resource "aws_lambda_function" "func" {
  filename         = data.archive_file.lambda_zip.output_path
  function_name    = local.project
  role             = aws_iam_role.lambda_role.arn
  handler          = "lambda_function.lambda_handler"
  runtime          = "python3.12"
  environment {
    variables = {
      OUTPUT_QUEUE_URL = aws_sqs_queue.output.id
      API_KEY          = "testkey"
    }
  }
}

# خريطة مصدر الأحداث (SQS → Lambda)
resource "aws_lambda_event_source_mapping" "sqs_trigger" {
  event_source_arn  = aws_sqs_queue.input.arn
  function_name     = aws_lambda_function.func.arn
  batch_size        = 1
  enabled           = true
}

# API Gateway
resource "aws_api_gateway_rest_api" "api" {
  name = "${local.project}-api"
}

resource "aws_api_gateway_resource" "lam" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  parent_id   = aws_api_gateway_rest_api.api.root_resource_id
  path_part   = "lam"
}

resource "aws_api_gateway_method" "get_lam" {
  rest_api_id   = aws_api_gateway_rest_api.api.id
  resource_id   = aws_api_gateway_resource.lam.id
  http_method   = "GET"
  authorization = "NONE"
  api_key_required = true
}

resource "aws_api_gateway_integration" "lambda_integration" {
  rest_api_id = aws_api_gateway_rest_api.api.id
  resource_id = aws_api_gateway_resource.lam.id
  http_method = aws_api_gateway_method.get_lam.http_method

  integration_http_method = "POST"
  type                    = "AWS_PROXY"
  uri                     = aws_lambda_function.func.invoke_arn
}

resource "aws_lambda_permission" "api_gateway" {
  statement_id  = "AllowAPIGatewayInvoke"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.func.function_name
  principal     = "apigateway.amazonaws.com"
  source_arn    = "${aws_api_gateway_rest_api.api.execution_arn}/*/*"
}

# مفتاح API وخطة الاستخدام
resource "aws_api_gateway_api_key" "key" {
  name    = "testkey"
  value   = "testkey"
  enabled = true
}

resource "aws_api_gateway_usage_plan" "plan" {
  name = "basic"
  api_stages {
    api_id = aws_api_gateway_rest_api.api.id
    stage  = aws_api_gateway_deployment.deploy.stage_name
  }
}

resource "aws_api_gateway_usage_plan_key" "plan_key" {
  key_id        = aws_api_gateway_api_key.key.id
  key_type      = "API_KEY"
  usage_plan_id = aws_api_gateway_usage_plan.plan.id
}

resource "aws_api_gateway_deployment" "deploy" {
  depends_on = [aws_api_gateway_integration.lambda_integration]
  rest_api_id = aws_api_gateway_rest_api.api.id
  stage_name  = "v1"
}

output "api_url" {
  value = "${aws_api_gateway_deployment.deploy.invoke_url}/lam"
}

الخطوة 3: النشر والاختبار

  1. تهيئة Terraform:
cd infra
terraform init
  1. تطبيق التكوين:
terraform apply
  1. اختبار نقطة نهاية API Gateway:
curl -H "x-api-key: testkey" "<API_URL>?par=10"
# من المتوقع استقبال: {"res": 11}
  1. اختبار SQS:

أرسل رسالة إلى صف الإدخال:

aws sqs send-message --queue-url <input-queue-url> --message-body '{"par": 5}'

ثم تحقق من صف الإخراج:

aws sqs receive-message --queue-url <output-queue-url>
# من المتوقع استقبال: {"res": 6}

الخطوة 4: التنظيف

لإزالة جميع الموارد:

terraform destroy

الملخص

[صف إدخال SQS] ─▶ [وظيفة Lambda] ─▶ [صف إخراج SQS]
                           ▲
                           │
                 [API Gateway /lam?par=N]
                           │
                     مُحمي بمفتاح API

لقد بنت مؤخرًا وظيفة Lambda متعددة المُحفزات التي:

  • تستهلك من صف SQS وتُنشر إلى صف SQS.
  • تتفاعل مع طلبات HTTP عبر API Gateway.
  • تفرض مفتاح API باستخدام فحص بسيط للمؤشر.
  • تُدار تمامًا عبر Terraform لبناء البنية التحتية بدون خادم قابلة للتكرار.

إعجاب!

هذا النمط مثالي لتحويل الرسائل الخفيفة، والخدمات الدقيقة، أو توصيل الأنظمة غير المتزامنة والمتزامنة في AWS - كلها باستخدام بضع سطور من Python وTerraform.

إذا كنت ترغب في رؤية مثال Lambda أكثر تقدمًا باستخدام AWS SAM - يرجى التحقق من هذه المقالة: كتابة Lambda باستخدام AWS SAM + AWS SQS + Python PowerTools

الروابط المفيدة