AWS SAM + AWS SQS + Python PowerTools
कार्यान्वयन का उदाहरण
यहां मैं एक जोड़े लैम्ब्डा फ़ंक्शन्स इन एएसएम को लागू करने के लिए आवश्यक चरणों की सूची दे रहा हूं:
- रेस्ट एपीआई लैम्ब्डा: एसक्यूएस में संदेश भेजें
- एसक्यूएस लैम्ब्डा ट्रिगर: एसक्यूएस से संदेश प्रोसेस करें
अगर आप कुरियस हैं कि एएसएम क्या है और कैसे शुरू करें - तो इस दस्तावेज़ को देखें: एएसएम और लेयर्ड लैम्ब्डा के साथ लेयर्ड लैम्ब्डा
प्रीकॉन्फिग
एएसएम और पायथन के साथ रेस्ट-एपीआई को लागू करें, उदाहरण के लिए जैसा वर्णित है एएसएम और पायथन के साथ लेयर्ड लैम्ब्डा
रेस्ट एपीआई लैम्ब्डा: एसक्यूएस में संदेश भेजें
टेम्पलेट.यम्ल में क्यू का वर्णन करें
MySQSQueue:
Type: String
Default: "TheQueue"
MySQSQueueArn:
Type: String
Default: "arn:aws:sqs:ap-southeast-2:123somenumbers123:TheQueue"
टेम्पलेट.यम्ल में एसक्यूएस के लिए संदेश भेजने वाले एपीआई फ़ंक्शन को जोड़ें।
PostMessageFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.lambda_handler
CodeUri: test
MemorySize: 1024
Description: PostMessageFunction function
Runtime: python3.12
Architectures:
- x86_64
Tracing: Active
Events:
PostProjectEstimatePath:
Type: Api
Properties:
Path: /q/{i}
Method: POST
RestApiId: !Ref MyApi
Layers:
- !Ref ApiSharedLayer
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
POWERTOOLS_METRICS_NAMESPACE: Powertools
LOG_LEVEL: INFO
Tags:
LambdaPowertools: python
Policies:
- SQSSendMessagePolicy:
QueueName: !Ref MySQSQueue
संदेश क्लास को लागू करें
from pydantic import BaseModel, Field
from typing import List, Optional
class MySqsRequest(BaseModel):
I: Optional[int] = Field(default=None)
एपी.पी में एसक्यूएस में संदेश भेजने के लिए पायथन कोड को लागू करें
@app.post("/q/<i>")
@tracer.capture_method
def post_q(i: int) -> dict:
try:
metrics.add_metric(name="PostMessageFunction", unit=MetricUnit.Count, value=1)
queue_url = 'your queue url here'
request = MySqsRequest(I=i)
message = request.model_dump_json()
send_sqs_message(queue_url, message)
return { 'response_code': 200 }
except ValueError as ve:
msg = str(ve)
logger.warning(f"Error processing prequest: {msg}")
return {
'response_code': 400,
'message': msg
}
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return {
'response_code': 500,
'message': 'Internal server error occurred while processing request'
}
# ----------------------------------------------------------------------------------------------
def get_queue_url() -> str:
queue_name = get_queue_name()
sqs_client = boto3.client("sqs")
response = sqs_client.get_queue_url(
QueueName=queue_name,
)
return response["QueueUrl"]
# ----------------------------------------------------------------------------------------------
def send_sqs_message(queue_url, message):
sqs_client = boto3.client("sqs")
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message),
)
# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return app.resolve(event, context)
एसक्यूएस लैम्ब्डा ट्रिगर: एसक्यूएस से संदेश प्रोसेस करें
टेम्पलेट.यम्ल में हैंडलर विवरण जोड़ें
PostCompleteFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app-sqs.lambda_handler
CodeUri: test
MemorySize: 1024
Description: PostCompleteFunction function
Runtime: python3.12
Architectures:
- x86_64
Tracing: Active
Events:
SQSQueueEstimateEvent:
Type: SQS
Properties:
Queue: !Ref MySQSQueueArn
Layers:
- !Ref ApiSharedLayer
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
POWERTOOLS_METRICS_NAMESPACE: Powertools
LOG_LEVEL: INFO
Tags:
LambdaPowertools: python
Policies:
- SQSPollerPolicy:
QueueName: !Ref MySQSQueue
और एपी-एसक्यू.पी में पायथन कोड:
tracer = Tracer()
logger = Logger()
metrics = Metrics(namespace="Powertools")
# ----------------------------------------------------------------------------------------------
processor = BatchProcessor(event_type=EventType.SQS)
# ----------------------------------------------------------------------------------------------
@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.json_body
req = TypeAdapter(MySqsRequest).validate_json(payload)
logger.info(f"I: {req.I}")
# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context,
)
तैनाती
sam validate && \
sam build --use-container && \
sam deploy
नमूना कॉल
$token और $api वातावरण चर विनिर्देश करें फिर कॉल करें:
curl -v -H "Authorization: Bearer $token" -X POST ${api}/q/6 -d @test-curl/q_src.json | jq .
आपको कुछ ऐसा दिखाई देना चाहिए
{
"response_code": 200
}
लॉगचेक:
sam logs -n PostCompleteFunction --tail
आपको कुछ ऐसा दिखाई देना चाहिए
2025/01/16/[$LATEST]5168b560c9eb4ca4871e0ed3e7f0c20d 2025-01-16T02:15:13.842000 {
"level": "INFO",
"location": "record_handler:585",
"message": "I: 6",
"timestamp": "2025-01-16 02:15:13,842+0000",
"service": "PowertoolsHelloWorld",
"cold_start": true,
"function_name": "asdasdads-PostCompleteFunction-qwe123",
"function_memory_size": "1024",
"function_arn": "arn:aws:lambda:ap-southeast-2:123123123:function:asdasdads-PostCompleteFunction-qwe123",
"function_request_id": "f7f469da-5f33-5087-8613-09d832dcedd7",
"xray_trace_id": "1-67886baf-8fefc4f2eb65405758c4d008"
}
अच्छा!