بناء خدمات ميكروسيرفيسات قائمة على الأحداث باستخدام AWS Kinesis
架構以事件驅動,結合 AWS Kinesis 進行擴展
AWS Kinesis أصبح ركيزة أساسية في بناء العمليات المعمارية الحديثة القائمة على الأحداث، مما يتيح معالجة البيانات في الوقت الفعلي بحجم كبير مع أقل مسؤولية تشغيلية.

فهم العمليات المعمارية القائمة على الأحداث
العمليات المعمارية القائمة على الأحداث (EDA) هي نمط تصميم حيث تتفاعل الخدمات من خلال الأحداث بدلًا من المكالمات المتزامنة المباشرة. توفر هذه الطريقة عدة مزايا:
- الانفصال المرن: لا تحتاج الخدمات إلى معرفة وجود بعضها البعض
- التوسع: تتوسع كل خدمة بشكل مستقل بناءً على حملها
- المرونة: يمكن إضافة خدمات جديدة دون تعديل الخدمات القائمة
- المرونة: لا تؤثر الأعطال في خدمة على الخدمات الأخرى
تُوفر AWS Kinesis الهيكل الأساسي لتنفيذ EDA من خلال العمل كتدفق أحداث موزع ومستقر يفصل بين المُنشئين والمُستهلكين.
لمحة عامة عن AWS Kinesis
تقدم AWS عدة خدمات Kinesis، كل منها مصممة لحالات استخدام معينة. عند تقييم حلول البث، قد ترغب أيضًا في النظر في مقارنة RabbitMQ على EKS مقابل SQS لحالات استخدام الرسائل المختلفة وتأثيرات التكلفة.
Kinesis Data Streams
الخدمة الأساسية للبث التي تلتقط، تخزن، ومعالجة السجلات البيانات في الوقت الفعلي. مناسبة Data Streams ل:
- تطبيقات معالجة البيانات في الوقت الفعلي المخصصة
- بناء أنظمة أنابيب البيانات ذات التأخير أقل من الثانية
- معالجة ملايين الأحداث في الثانية
- تنفيذ أنماط مصادر الأحداث
Kinesis Data Firehose
خدمة مدارة بالكامل تنقل البيانات البث إلى وجهات مثل S3 و Redshift و Elasticsearch أو نقاط النهاية HTTP. مناسبة ل:
- أنابيب ETL البسيطة
- تجميع السجلات وتخزينها
- تحليلات في الوقت الفعلي (التأخير الأدنى 60 ثانية)
- السيناريوهات التي لا تحتاج إلى منطق معالجة مخصص
Kinesis Data Analytics
تُعالج وتحليل البيانات البث باستخدام SQL أو Apache Flink. تشمل حالات الاستخدام:
- لوحات تحكم في الوقت الفعلي
- ETL البث
- اكتشاف الاستثناءات في الوقت الفعلي
- إنتاج مؤشرات مستمرة
الأنماط المعمارية مع Kinesis
1. نمط مصادر الأحداث
يُخزن كل تغيير في حالة التطبيق كسلسلة من الأحداث. Kinesis مثالي لهذا. إذا كنت بحاجة إلى تذكير حول أساسيات Python، تحقق من قائمة Python الخاصة بنا:
import boto3
import json
from datetime import datetime
kinesis = boto3.client('kinesis', region_name='us-east-1')
def publish_event(stream_name, event_type, payload):
"""نشر حدث إلى تدفق Kinesis"""
event = {
'eventId': generate_unique_id(),
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
response = kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event),
PartitionKey=payload.get('userId', 'default')
)
return response['SequenceNumber']
# مثال: حدث تسجيل المستخدم
publish_event(
stream_name='user-events',
event_type='USER_REGISTERED',
payload={
'userId': '12345',
'email': 'user@example.com',
'registrationDate': '2025-10-30'
}
)
2. نمط CQRS (فصل مسؤوليات الأوامر والقراءة)
استخدم Kinesis كحافز الأحداث لفصل العمليات القرائية والكتابة:
package main
import (
"encoding/json"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)
type OrderCommand struct {
CommandType string `json:"commandType"`
OrderID string `json:"orderId"`
UserID string `json:"userId"`
Items []OrderItem `json:"items"`
}
func ProcessCommand(kinesisClient *kinesis.Kinesis, command OrderCommand) error {
data, err := json.Marshal(command)
if err != nil {
return err
}
_, err = kinesisClient.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String("order-commands"),
Data: data,
PartitionKey: aws.String(command.OrderID),
})
return err
}
3. نمط توزيع المهام مع Lambda
معالجة الأحداث من تدفق واحد مع عدة دوال Lambda. للحصول على تنفيذ TypeScript مع أمان نوعي أقوى، راجع قائمة TypeScript الخاصة بنا:
// دالة Lambda لتنبيه البريد الإلكتروني
exports.handler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await sendOrderConfirmationEmail(payload);
}
}
};
// دالة Lambda أخرى لتحديث المخزون
exports.inventoryHandler = async (event) => {
for (const record of event.Records) {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
if (payload.eventType === 'ORDER_PLACED') {
await updateInventory(payload.items);
}
}
};
أفضل الممارسات في الإنتاج
1. اختيار عدد الشards المناسب
احسب متطلبات الشards بناءً على:
- الدخول: 1 ميغابايت/ثانية أو 1000 سجل/ثانية لكل shard
- الخروج: 2 ميغابايت/ثانية لكل shard (المستهلكون القياسية) أو 2 ميغابايت/ثانية لكل مستهلك مع توزيع محسّن
def calculate_shards(records_per_second, avg_record_size_kb):
"""احسب عدد الشards المطلوب"""
# سعة الدخول
ingress_shards = max(
records_per_second / 1000,
(records_per_second * avg_record_size_kb) / 1024
)
return int(ingress_shards) + 1 # أضف مخزونًا
2. تنفيذ معالجة الأخطاء المناسبة
from botocore.exceptions import ClientError
import time
def put_record_with_retry(kinesis_client, stream_name, data, partition_key,
max_retries=3):
"""وضع سجل مع محاولة مجددة ذات تراجع أسيعي"""
for attempt in range(max_retries):
try:
response = kinesis_client.put_record(
StreamName=stream_name,
Data=data,
PartitionKey=partition_key
)
return response
except ClientError as e:
if e.response['Error']['Code'] == 'ProvisionedThroughputExceededException':
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # تراجع أسيعي
continue
raise
3. استخدام توزيع المهام المحسّن للمستهلكين المتعددين
يوفر توزيع المهام المحسّن تدفقًا مخصصًا لكل مستهلك:
# تسجيل مستهلك مع توزيع محسّن
aws kinesis register-stream-consumer \
--stream-arn arn:aws:kinesis:us-east-1:123456789:stream/my-stream \
--consumer-name my-consumer-app
4. مراقبة المعايير الرئيسية
المعايير الأساسية لمراقبة CloudWatch:
IncomingRecords: عدد السجلات المُدخلة بنجاحIncomingBytes: حجم البيانات الداخلة بالبايتGetRecords.IteratorAgeMilliseconds: مدى تأخر المستهلكينWriteProvisionedThroughputExceeded: أحداث التقييدReadProvisionedThroughputExceeded: تقييد المستهلكين
5. تنفيذ استراتيجية مفتاح التقسيم المناسبة
import hashlib
def get_partition_key(user_id, shard_count=10):
"""إنشاء مفتاح التقسيم مع توزيع متساوي"""
# استخدام التجزئة المتسقة لتوزيع متساوي
hash_value = int(hashlib.md5(user_id.encode()).hexdigest(), 16)
shard_id = hash_value % shard_count
return f"{user_id}#{shard_id}"
مثال على التنفيذ في العالم الحقيقي
إليك مثالًا كاملًا على عمليات معالجة الطلبات في العمليات المعمارية الدقيقة:
import boto3
import json
from decimal import Decimal
from typing import Dict, List
class OrderProcessingService:
def __init__(self, stream_name: str):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def create_order(self, user_id: str, items: List[Dict]) -> str:
"""إنشاء طلب ونشر الأحداث"""
order_id = self.generate_order_id()
# نشر حدث إنشاء الطلب
self.publish_event('ORDER_CREATED', {
'orderId': order_id,
'userId': user_id,
'items': items,
'status': 'PENDING',
'total': self.calculate_total(items)
}, partition_key=user_id)
return order_id
def publish_event(self, event_type: str, payload: Dict,
partition_key: str):
"""نشر حدث إلى تدفق Kinesis"""
event = {
'eventType': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
self.kinesis.put_record(
StreamName=self.stream_name,
Data=json.dumps(event, default=str),
PartitionKey=partition_key
)
class InventoryService:
"""تستهلك أحداث الطلبات وتقوم بتحديث المخزون"""
def process_records(self, records):
for record in records:
data = json.loads(record['kinesis']['data'])
if data['eventType'] == 'ORDER_CREATED':
self.reserve_inventory(data['payload'])
elif data['eventType'] == 'ORDER_CANCELLED':
self.release_inventory(data['payload'])
def reserve_inventory(self, order_data):
# تحديث قاعدة بيانات المخزون
for item in order_data['items']:
# التنفيذ هنا
pass
استراتيجية الهجرة من الأحادية إلى العمليات الدقيقة
المرحلة 1: نمط شجرة المُخنق
ابدأ بتحويل أحداث معينة عبر Kinesis بينما تبقى الأحادية:
- تحديد السياقات المحدودة في الأحادية
- إنشاء تدفقات Kinesis للأحداث عبر السياقات
- استخراج الخدمات تدريجيًا التي تستهلك من هذه التدفقات
- الحفاظ على التوافق الخلفي مع الأحادية
المرحلة 2: المعالجة المتوازنة
تشغيل الأنظمة القديمة والجديدة في وقت واحد:
def dual_write_pattern(legacy_db, kinesis_stream, data):
"""كتابة إلى النظام القديم وتدفق الأحداث في وقت واحد"""
try:
# كتابة إلى النظام الجديد أولاً
publish_to_kinesis(kinesis_stream, data)
# ثم تحديث النظام القديم
legacy_db.update(data)
except Exception as e:
# تنفيذ منطق المكمل
rollback_kinesis_event(kinesis_stream, data)
raise
المرحلة 3: الهجرة الكاملة
بمجرد التأكد من الثقة، قم بتوجيه كل حركة المرور عبر العمليات المعمارية القائمة على الأحداث.
استراتيجيات تحسين التكلفة
1. استخدام وضع الطلب المطلوب للعملاء المتغيرين
وضع الطلب المطلوب (المُقدّم في عام 2023) يتوسع تلقائيًا بناءً على حركة المرور:
# إنشاء تدفق بوضع الطلب المطلوب
aws kinesis create-stream \
--stream-name my-stream \
--stream-mode-details StreamMode=ON_DEMAND
2. تنفيذ تجميع البيانات
تقليل وحدات PUT عن طريق تجميع السجلات:
from aws_kinesis_agg.aggregator import RecordAggregator
def batch_put_records(kinesis_client, stream_name, records):
"""تجميع السجلات لتقليل التكاليف"""
aggregator = RecordAggregator()
for record in records:
aggregator.add_user_record(
partition_key=record['partition_key'],
data=record['data']
)
# إرسال السجل المجمّع
kinesis_client.put_record(
StreamName=stream_name,
Data=aggregator.serialize(),
PartitionKey=records[0]['partition_key']
)
3. تحسين الاحتفاظ بالبيانات
الاحتفاظ الافتراضي هو 24 ساعة. لا تمديد إلا إذا لزم الأمر:
# تعيين الاحتفاظ إلى 7 أيام
aws kinesis increase-stream-retention-period \
--stream-name my-stream \
--retention-period-hours 168
أفضل الممارسات في الأمان
1. التشفير في الراحة وفي أثناء النقل
# إنشاء تدفق مشفر
kinesis.create_stream(
StreamName='secure-stream',
ShardCount=1,
StreamModeDetails={'StreamMode': 'PROVISIONED'}
)
# تفعيل التشفير
kinesis.start_stream_encryption(
StreamName='secure-stream',
EncryptionType='KMS',
KeyId='alias/aws/kinesis'
)
2. سياسات IAM لحد الأدنى من الأذونات
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": "arn:aws:kinesis:us-east-1:123456789:stream/orders"
}
]
}
3. نقاط النهاية VPC
احتفظ بالحركة داخل شبكة AWS. عند إدارة البنية التحتية لـ AWS ككود، فكّر في استخدام Terraform - راجع قائمة Terraform الخاصة بنا:
aws ec2 create-vpc-endpoint \
--vpc-id vpc-12345678 \
--service-name com.amazonaws.us-east-1.kinesis-streams \
--route-table-ids rtb-12345678
المراقبة والتصحيح
تتبع موزع مع X-Ray
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all
patch_all()
@xray_recorder.capture('publish_event')
def publish_event_with_tracing(stream_name, event_data):
xray_recorder.put_annotation('eventType', event_data['type'])
xray_recorder.put_metadata('payload', event_data['payload'])
kinesis.put_record(
StreamName=stream_name,
Data=json.dumps(event_data),
PartitionKey=event_data['id']
)
استعلامات CloudWatch Logs Insights
-- العثور على أوقات معالجة بطيئة
fields @timestamp, eventType, processingTime
| filter processingTime > 1000
| sort @timestamp desc
| limit 20
-- تتبع معدل الأخطاء
fields @timestamp, eventType
| filter error = true
| stats count() by eventType
الأنماط المتقدمة
نمط Saga لمعاملات موزعة
تنفيذ معاملات طويلة الأمد عبر العمليات الدقيقة:
class OrderSaga:
def __init__(self, kinesis_client, stream_name):
self.kinesis = kinesis_client
self.stream_name = stream_name
self.saga_id = str(uuid.uuid4())
def execute(self, order_data):
"""تنفيذ Saga مع منطق المكمل"""
try:
# الخطوة 1: احتفظ بالمخزون
self.publish_command('RESERVE_INVENTORY', order_data)
# الخطوة 2: معالجة الدفع
self.publish_command('PROCESS_PAYMENT', order_data)
# الخطوة 3: شحن الطلب
self.publish_command('SHIP_ORDER', order_data)
except SagaException as e:
# المكمل في الترتيب المعاكس
self.compensate(e.failed_step)
def compensate(self, failed_step):
"""تنفيذ المعاملات المكملة"""
compensation_steps = {
'PROCESS_PAYMENT': ['RELEASE_INVENTORY'],
'SHIP_ORDER': ['REFUND_PAYMENT', 'RELEASE_INVENTORY']
}
for step in compensation_steps.get(failed_step, []):
self.publish_command(step, {'sagaId': self.saga_id})
استراتيجيات الاختبار
التطوير المحلي مع LocalStack
# تشغيل LocalStack مع Kinesis
docker run -d \
-p 4566:4566 \
-e SERVICES=kinesis \
localstack/localstack
# إنشاء تدفق اختبار
aws --endpoint-url=http://localhost:4566 kinesis create-stream \
--stream-name test-stream \
--shard-count 1
الاختبارات التكاملية
import pytest
from moto import mock_kinesis
@mock_kinesis
def test_event_publishing():
"""اختبار نشر الأحداث مع Kinesis المزيف"""
kinesis = boto3.client('kinesis', region_name='us-east-1')
kinesis.create_stream(StreamName='test-stream', ShardCount=1)
service = OrderProcessingService('test-stream')
order_id = service.create_order('user123', [
{'productId': 'prod1', 'quantity': 2}
])
assert order_id is not None
تحسين الأداء
تحسين حجم الدفعة
def optimize_batch_processing(records, batch_size=500):
"""معالجة السجلات في دفعات محسّنة"""
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
process_batch(batch)
استخدام تجميع الاتصالات
from botocore.config import Config
config = Config(
max_pool_connections=50,
retries={'max_attempts': 3, 'mode': 'adaptive'}
)
kinesis = boto3.client('kinesis', config=config)
الروابط المفيدة
موارد AWS Kinesis:
- مستندات AWS Kinesis
- دليل AWS Kinesis Data Streams
- مكتبة Kinesis Client (KCL)
- مُحسّن تكاليف AWS Kinesis
- قيود وحدود Kinesis Data Streams
- مدونة AWS Architecture - العمليات المعمارية القائمة على الأحداث
- أمثلة AWS Samples - Kinesis
المقالات المرتبطة:
- مقارنة Rabbitmq على Eks مقابل Sqs تكاليف الاستضافة
- قائمة TypeScript: المفاهيم الأساسية وال أفضل الممارسات
- قائمة Python
- قائمة Terraform - الأوامر الأساسية والأمثلة
الخاتمة
يوفر AWS Kinesis أساسًا قويًا لبناء عمليات معمارية دقيقة قابلة للتوسع. من خلال اتباع هذه الأنماط والممارسات المثلى، يمكنك إنشاء أنظمة قوية وقابلة للتوسع والصيانة. ابدأ بتدفق أحداث واحد، اختر تأكيد العمليات المعمارية، وتوسع تدريجيًا إلى أنماط أكثر تعقيدًا مع نمو نظامك.
مفتاح النجاح هو فهم متطلبات تدفق البيانات، اختيار الخدمة المناسبة من Kinesis لحالتك، وتنفيذ المراقبة ومعالجة الأخطاء من البداية.