分布式事务中的 Saga 模式 - Go 语言示例
使用Saga模式在微服务中处理事务
Saga模式 通过将分布式事务分解为一系列带有补偿操作的本地事务,提供了一种优雅的解决方案。
与依赖可能跨服务阻塞操作的分布式锁不同,Saga通过一系列可逆步骤实现最终一致性,使其非常适合长期运行的业务流程。
在微服务架构中,跨服务保持数据一致性是最大的挑战之一。当操作跨越具有独立数据库的多个服务时,传统的ACID事务无法正常工作,迫使开发人员寻找确保数据完整性的替代方法。
本指南演示了如何在Go中实现Saga模式,并通过实际示例涵盖编排和编舞两种方法。如果您需要Go基础知识的快速参考,Go速查表提供了有用的概述。
这张漂亮的图片由AI模型Flux 1 dev生成。
理解Saga模式
Saga模式最初由Hector Garcia-Molina和Kenneth Salem于1987年描述。在微服务的上下文中,它是一系列本地事务,每个事务在单个服务内更新数据。如果任何步骤失败,将执行补偿事务以撤销前面步骤的影响。
与使用两阶段提交(2PC)的传统分布式事务不同,Saga不跨服务持有锁,使其适合长期运行的业务流程。权衡是最终一致性而不是强一致性。
关键特性
- 无分布式锁:每个服务管理自己的本地事务
- 补偿操作:每个操作都有相应的回滚机制
- 最终一致性:系统最终达到一致状态
- 长期运行:适合需要数秒、数分钟甚至数小时的流程
Saga实现方法
实现Saga模式有两种主要方法:编排和编舞。
编排模式
在编排中,一个中央协调器(编排器)管理整个事务流程。编排器负责:
- 按正确顺序调用服务
- 处理故障并触发补偿
- 保持Saga的状态
- 协调重试和超时
优点:
- 集中式控制和可见性
- 更容易理解和调试
- 更好的错误处理和恢复
- 整体流程更简单的测试
缺点:
- 单点故障(尽管可以通过冗余缓解)
- 需要维护额外的服务
- 对于复杂流程可能成为瓶颈
Go中的示例:
type OrderSagaOrchestrator struct {
orderService OrderService
paymentService PaymentService
inventoryService InventoryService
shippingService ShippingService
}
func (o *OrderSagaOrchestrator) CreateOrder(order Order) error {
sagaID := generateSagaID()
// 步骤1:创建订单
orderID, err := o.orderService.Create(order)
if err != nil {
return err
}
// 步骤2:预留库存
if err := o.inventoryService.Reserve(order.Items); err != nil {
o.orderService.Cancel(orderID) // 补偿
return err
}
// 步骤3:处理支付
paymentID, err := o.paymentService.Charge(order.CustomerID, order.Total)
if err != nil {
o.inventoryService.Release(order.Items) // 补偿
o.orderService.Cancel(orderID) // 补偿
return err
}
// 步骤4:创建运输
if err := o.shippingService.CreateShipment(orderID); err != nil {
o.paymentService.Refund(paymentID) // 补偿
o.inventoryService.Release(order.Items) // 补偿
o.orderService.Cancel(orderID) // 补偿
return err
}
return nil
}
编舞模式
在编舞中,没有中央协调器。每个服务都知道该做什么并通过事件进行通信。服务监听事件并相应地做出反应。这种事件驱动的方法与AWS Kinesis等消息流平台结合时特别强大,这些平台为跨微服务的事件分发提供了可扩展的基础设施。有关使用Kinesis实现事件驱动微服务的全面指南,请参阅 使用AWS Kinesis构建事件驱动的微服务.
优点:
- 去中心化和可扩展
- 没有单点故障
- 服务保持松耦合
- 事件驱动架构的自然契合
缺点:
- 更难理解整体流程
- 难以调试和追踪
- 复杂的错误处理
- 循环依赖的风险
使用事件驱动架构的示例:
// 订单服务
type OrderService struct {
eventBus EventBus
repo OrderRepository
}
func (s *OrderService) CreateOrder(order Order) (string, error) {
orderID, err := s.repo.Save(order)
if err != nil {
return "", err
}
s.eventBus.Publish("OrderCreated", OrderCreatedEvent{
OrderID: orderID,
CustomerID: order.CustomerID,
Items: order.Items,
Total: order.Total,
})
return orderID, nil
}
func (s *OrderService) HandlePaymentFailed(event PaymentFailedEvent) error {
return s.repo.Cancel(event.OrderID) // 补偿
}
// 支付服务
type PaymentService struct {
eventBus EventBus
client PaymentClient
}
func (s *PaymentService) HandleOrderCreated(event OrderCreatedEvent) {
paymentID, err := s.client.Charge(event.CustomerID, event.Total)
if err != nil {
s.eventBus.Publish("PaymentFailed", PaymentFailedEvent{
OrderID: event.OrderID,
})
return
}
s.eventBus.Publish("PaymentSucceeded", PaymentSucceededEvent{
OrderID: event.OrderID,
PaymentID: paymentID,
})
}
func (s *PaymentService) HandleInventoryReservationFailed(event InventoryReservationFailedEvent) error {
// 补偿:退款
return s.client.Refund(event.PaymentID)
}
补偿策略
补偿是Saga模式的核心。每个操作必须有一个可以逆转其影响的相应补偿。
补偿类型
-
可逆操作:可以直接撤销的操作
- 示例:释放预留的库存,退款
-
补偿操作:实现相反效果的不同操作
- 示例:取消订单而不是删除它
-
悲观补偿:预分配可以释放的资源
- 示例:在支付前预留库存
-
乐观补偿:执行操作并在需要时进行补偿
- 示例:先支付,如果库存不可用则退款
幂等性要求
所有操作和补偿必须是幂等的。这确保了重试失败的操作不会导致重复效果。
func (s *PaymentService) Refund(paymentID string) error {
// 检查是否已退款
payment, err := s.getPayment(paymentID)
if err != nil {
return err
}
if payment.Status == "refunded" {
return nil // 已退款,幂等
}
// 处理退款
return s.processRefund(paymentID)
}
最佳实践
1. Saga状态管理
维护每个Saga实例的状态以跟踪进度并启用恢复。将Saga状态持久化到数据库时,选择合适的ORM对于性能和可维护性至关重要。对于基于PostgreSQL的实现,考虑在比较Go ORMs用于PostgreSQL:GORM vs Ent vs Bun vs sqlc中进行比较,以选择最适合您的Saga状态存储需求的ORM:
type SagaState struct {
ID string
Status SagaStatus
Steps []SagaStep
CurrentStep int
CreatedAt time.Time
UpdatedAt time.Time
}
type SagaStep struct {
Service string
Operation string
Status StepStatus
Compensated bool
Data map[string]interface{}
}
2. 超时处理
为每个步骤实现超时,以防止Saga无限期挂起:
type SagaOrchestrator struct {
timeout time.Duration
}
func (o *SagaOrchestrator) ExecuteWithTimeout(step SagaStep) error {
ctx, cancel := context.WithTimeout(context.Background(), o.timeout)
defer cancel()
done := make(chan error, 1)
go func() {
done <- step.Execute()
}()
select {
case err := <-done:
return err
case <-ctx.Done():
// 超时发生,补偿
if err := step.Compensate(); err != nil {
return fmt.Errorf("补偿失败: %w", err)
}
return fmt.Errorf("步骤 %s 超时 %v", step.Name(), o.timeout)
}
}
3. 重试逻辑
为瞬时故障实现指数退避:
func retryWithBackoff(operation func() error, maxRetries int) error {
backoff := time.Second
for i := 0; i < maxRetries; i++ {
err := operation()
if err == nil {
return nil
}
if !isTransientError(err) {
return err
}
time.Sleep(backoff)
backoff *= 2
}
return fmt.Errorf("操作在 %d 次重试后失败", maxRetries)
}
4. 使用事件溯源进行Saga状态管理
使用事件溯源来维护完整的审计跟踪。在实现事件存储和重放机制时,Go泛型可以帮助创建类型安全、可重用的事件处理代码。有关使用泛型的高级模式,请参阅Go泛型:使用案例和模式.
type SagaEvent struct {
SagaID string
EventType string
Payload []byte
Timestamp time.Time
Version int64
}
type SagaEventStore struct {
store EventRepository
}
func (s *SagaEventStore) AppendEvent(sagaID string, eventType string, payload interface{}) error {
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("无法序列化负载: %w", err)
}
version, err := s.store.GetNextVersion(sagaID)
if err != nil {
return fmt.Errorf("无法获取版本: %w", err)
}
event := SagaEvent{
SagaID: sagaID,
EventType: eventType,
Payload: data,
Timestamp: time.Now(),
Version: version,
}
return s.store.Save(event)
}
func (s *SagaEventStore) ReplaySaga(sagaID string) (*Saga, error) {
events, err := s.store.GetEvents(sagaID)
if err != nil {
return nil, fmt.Errorf("无法获取事件: %w", err)
}
saga := NewSaga()
for _, event := range events {
if err := saga.Apply(event); err != nil {
return nil, fmt.Errorf("无法应用事件: %w", err)
}
}
return saga, nil
}
5. 监控和可观测性
实现全面的日志记录和追踪:
func (o *OrderSagaOrchestrator) CreateOrder(order Order) error {
span := tracer.StartSpan("saga.create_order")
defer span.Finish()
span.SetTag("saga.id", sagaID)
span.SetTag("order.id", order.ID)
logger.WithFields(log.Fields{
"saga_id": sagaID,
"order_id": order.ID,
"step": "create_order",
}).Info("Saga开始")
// ... Saga执行
return nil
}
常见模式和反模式
应遵循的模式
- Saga协调器模式:使用专用服务进行编排
- 出站模式:确保可靠事件发布
- 幂等性键:为所有操作使用唯一键
- Saga状态机:将Saga建模为状态机
应避免的反模式
- 同步补偿:不要等待补偿完成
- 嵌套Saga:避免Saga调用其他Saga(使用子Saga代替)
- 共享状态:不要在Saga步骤之间共享状态
- 长期运行的步骤:将耗时过长的步骤分解
工具和框架
一些框架可以帮助实现Saga模式:
- Temporal:内置Saga支持的工作流编排平台
- Zeebe:微服务编排的工作流引擎
- Eventuate Tram:Spring Boot的Saga框架
- AWS Step Functions:无服务器工作流编排
- Apache Camel:带有Saga支持的集成框架
对于需要CLI接口进行管理和监控的协调器服务,使用Cobra和Viper在Go中构建CLI应用程序提供了创建与Saga协调器交互的命令行工具的优秀模式。
在Kubernetes中部署基于Saga的微服务时,实现服务网格可以显著提高可观测性、安全性和流量管理。使用Istio和Linkerd实现服务网格涵盖了服务网格如何通过提供分布式追踪和断路等横切关注点来补充分布式事务模式。
何时使用Saga模式
使用Saga模式时:
- ✅ 操作跨越多个微服务
- ✅ 长期运行的业务流程
- ✅ 可接受最终一致性
- ✅ 需要避免分布式锁
- ✅ 服务具有独立数据库
避免使用时:
- ❌ 需要强一致性
- ❌ 操作简单且快速
- ❌ 所有服务共享同一数据库
- ❌ 补偿逻辑过于复杂
结论
Saga模式对于管理微服务架构中的分布式事务至关重要。虽然它引入了复杂性,但它为跨服务边界保持数据一致性提供了实用的解决方案。选择编排以获得更好的控制和可见性,或选择编舞以实现可扩展性和松耦合。始终确保操作是幂等的,实现适当的补偿逻辑,并保持全面的可观测性。
成功实现Saga的关键是理解您的一致性需求,仔细设计补偿逻辑,并为您的用例选择合适的方法。通过正确实现,Saga使您能够构建在分布式系统中保持数据完整性的弹性、可扩展的微服务。