AWS SAM, AWS SQS और Python PowerTools का उपयोग करके सर्वरलेस सिस्टम्स विकास
उदाहरण कार्यान्वयन
यहाँ AWS SAM में लैम्ब्डा फंक्शन जोड़ने के लिए आवश्यक चरणों की सूची दी गई है:
- REST API लैम्ब्डा: SQS को संदेश भेजें
- SQS लैम्ब्डा ट्रिगर: SQS से संदेश प्रोसेस करें
अगर आप curious हैं कि AWS SAM क्या है और कैसे शुरू करें - इस दस्तावेज़ की जांच करें: Layered Lambdas with AWS SAM and Python
प्रीकॉन्फिग
AWS SAM और पाइथन के साथ REST-API लागू करें, जैसे कि Layered Lambdas with AWS SAM and Python में वर्णित है
REST API लैम्ब्डा: SQS को संदेश भेजें
template.yaml में क्यू का वर्णन करें
MySQSQueue:
Type: String
Default: "TheQueue"
MySQSQueueArn:
Type: String
Default: "arn:aws:sqs:ap-southeast-2:123somenumbers123:TheQueue"
template.yaml में उस API फंक्शन जोड़ें जो संदेश को AWS SQS को भेजेगा।
PostMessageFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app.lambda_handler
CodeUri: test
MemorySize: 1024
Description: PostMessageFunction function
Runtime: python3.12
Architectures:
- x86_64
Tracing: Active
Events:
PostProjectEstimatePath:
Type: Api
Properties:
Path: /q/{i}
Method: POST
RestApiId: !Ref MyApi
Layers:
- !Ref ApiSharedLayer
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
POWERTOOLS_METRICS_NAMESPACE: Powertools
LOG_LEVEL: INFO
Tags:
LambdaPowertools: python
Policies:
- SQSSendMessagePolicy:
QueueName: !Ref MySQSQueue
संदेश क्लास लागू करें
from pydantic import BaseModel, Field
from typing import List, Optional
class MySqsRequest(BaseModel):
I: Optional[int] = Field(default=None)
app.py में क्यू को संदेश भेजने के लिए पाइथन कोड लागू करें
@app.post("/q/<i>")
@tracer.capture_method
def post_q(i: int) -> dict:
try:
metrics.add_metric(name="PostMessageFunction", unit=MetricUnit.Count, value=1)
queue_url = 'your queue url here'
request = MySqsRequest(I=i)
message = request.model_dump_json()
send_sqs_message(queue_url, message)
return { 'response_code': 200 }
except ValueError as ve:
msg = str(ve)
logger.warning(f"Error processing prequest: {msg}")
return {
'response_code': 400,
'message': msg
}
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return {
'response_code': 500,
'message': 'Internal server error occurred while processing request'
}
# ----------------------------------------------------------------------------------------------
def get_queue_url() -> str:
queue_name = get_queue_name()
sqs_client = boto3.client("sqs")
response = sqs_client.get_queue_url(
QueueName=queue_name,
)
return response["QueueUrl"]
# ----------------------------------------------------------------------------------------------
def send_sqs_message(queue_url, message):
sqs_client = boto3.client("sqs")
sqs_client.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps(message),
)
# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST)
@tracer.capture_lambda_handler
@metrics.log_metrics(capture_cold_start_metric=True)
def lambda_handler(event: dict, context: LambdaContext) -> dict:
return app.resolve(event, context)
SQS लैम्ब्डा ट्रिगर: SQS से संदेश प्रोसेस करें
template.yaml में हैंडलर वर्णन जोड़ें
PostCompleteFunction:
Type: AWS::Serverless::Function
Properties:
Handler: app-sqs.lambda_handler
CodeUri: test
MemorySize: 1024
Description: PostCompleteFunction function
Runtime: python3.12
Architectures:
- x86_64
Tracing: Active
Events:
SQSQueueEstimateEvent:
Type: SQS
Properties:
Queue: !Ref MySQSQueueArn
Layers:
- !Ref ApiSharedLayer
Environment:
Variables:
POWERTOOLS_SERVICE_NAME: PowertoolsHelloWorld
POWERTOOLS_METRICS_NAMESPACE: Powertools
LOG_LEVEL: INFO
Tags:
LambdaPowertools: python
Policies:
- SQSPollerPolicy:
QueueName: !Ref MySQSQueue
और app-sqs.py में पाइथन कोड:
tracer = Tracer()
logger = Logger()
metrics = Metrics(namespace="Powertools")
# ----------------------------------------------------------------------------------------------
processor = BatchProcessor(event_type=EventType.SQS)
# ----------------------------------------------------------------------------------------------
@tracer.capture_method
def record_handler(record: SQSRecord):
payload: str = record.json_body
req = TypeAdapter(MySqsRequest).validate_json(payload)
logger.info(f"I: {req.I}")
# ----------------------------------------------------------------------------------------------
@logger.inject_lambda_context
@tracer.capture_lambda_handler
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context,
)
डिप्लॉय
sam validate && \
sam build --use-container && \
sam deploy
नमूना कॉल
$token और $api एन्वायरनमेंट वेरिएबल कॉन्फ़िगर करें फिर कॉल करें:
curl -v -H "Authorization: Bearer $token" -X POST ${api}/q/6 -d @test-curl/q_src.json | jq .
आपको कुछ ऐसा देखना चाहिए
{
"response_code": 200
}
लॉगचेक:
sam logs -n PostCompleteFunction --tail
आपको कुछ ऐसा देखना चाहिए
2025/01/16/[$LATEST]5168b560c9eb4ca4871e0ed3e7f0c20d 2025-01-16T02:15:13.842000 {
"level": "INFO",
"location": "record_handler:585",
"message": "I: 6",
"timestamp": "2025-01-16 02:15:13,842+0000",
"service": "PowertoolsHelloWorld",
"cold_start": true,
"function_name": "asdasdads-PostCompleteFunction-qwe123",
"function_memory_size": "1024",
"function_arn": "arn:aws:lambda:ap-southeast-2:123123123:function:asdasdads-PostCompleteFunction-qwe123",
"function_request_id": "f7f469da-5f33-5087-8613-09d832dcedd7",
"xray_trace_id": "1-67886baf-8fefc4f2eb65405758c4d008"
}
बहुत अच्छा!
उम्मीद है कि आपको AWS सर्वरलेस प्लेटफॉर्म के साथ असिंक्रोनस प्रोसेसिंग का यह उदाहरण उपयोगी लगा है। शुभ दिन हो!