PostgreSQL を用いた Go 言語でのトランザクショナルアウトボックスパターン
データをイベントに書き込み、決して分割しないでください。
一度は成功するはずの2つの書き込みが、やがて別々に失敗します。
注文サービスはデータベースに注文を保存し、次にメッセージブローカーにorder.createdイベントをパブリッシュします。
これらの2つの操作は順次実行されます。
その間に問題が発生します:ブローカーがダウンしたり、ネットワークがタイムアウトしたり、プロセスが再起動したり、コンテナが強制終了されたりします。データベースへの書き込みは成功しました。しかしパブリッシュは失敗しました。新しい注文に関する情報を必要とする下流のサービスは、そのことを決して知ることができませんでした。顧客から電話が入るまで、誰もそれに気づきませんでした。
これは「デュアルライト(二重書き込み)問題」であり、分散システムにおけるサイレントなデータ損失の最も一般的な原因の一つです。これに対する標準的な解決策は、トランザクショナルアウトボックスパターンです。

デュアルライト(二重書き込み)問題
この失敗モードは、一度見ればその理屈は容易に理解できます:
BEGIN;
INSERT INTO orders ... -- 成功
COMMIT;
PUBLISH order.created ... -- 失敗、クラッシュ、または到達しない
データベースとメッセージブローカーはトランザクションの境界を共有しません。両方をカバーするロールバックは存在しません。save -> publish(保存→パブリッシュ)の順で実行するすべてのサービスには、このギャップがあります。このパターンは多くの形態で現れます:
db.Save(order)の後にevents.Publish(OrderCreated{...})を実行する- トランザクションをコミットし、その後外部のWebhookを呼び出すHTTPハンドラ
- 1つのキューからレコードを処理し、結果を別のキューに書き込むワーカー
いずれの場合も結果は同じです:片方の操作が成功し、もう片方が失敗し、システムは監視では目に見えない状態に陥ります。なぜなら、個々の操作はいずれも一時は成功を返しているからです。
リトライループではこれを解決できません。データベースコミットの後にパブリッシュをリトライするのは、リトライ自体が信頼できる場合にのみ機能します。しかしそれは、あなたが持っていないまさにその耐久性保証を必要とします。
トランザクショナルアウトボックスパターンが何をするか
アウトボックスパターンは、直接のパブリッシュを完全に削除することでこのギャップを解消します。ビジネスロジック内からブローカーを呼び出す代わりに、ビジネスデータと同じデータベーストランザクション内で、outboxテーブルにイベントレコードを書き込みます。別のバックグラウンドプロセス(リレー)がアウトボックステーブルを読み取り、ブローカーへパブリッシュします。
BEGIN;
INSERT INTO orders ... -- ビジネスデータ
INSERT INTO outbox_events ... -- イベントレコード
COMMIT;
-- リレープロセス(別個に):
SELECT ... FROM outbox_events FOR UPDATE SKIP LOCKED;
PUBLISH order.created ...
UPDATE outbox_events SET processed_at = NOW() WHERE id = $1;
両方の書き込みが成功するか、両方が失敗します。PostgreSQLからすでに持っているトランザクション保証が、今やイベントレコードもカバーします。リレーは、イベントが永続ストレージに存在するため、必要なだけパブリッシュのリトライを行うことができます。リレーが実行中にクラッシュした場合でも、再起動してリトライします。最悪の結果は、イベントが複数回パブリッシュされることです。これは、コンシューマー(消費者)に冪等性(べき等性)を持たせることで処理されます(分散システムにおける冪等性参照)。
アウトボックステーブルのPostgreSQLスキーマ
スキーマは意図的にシンプルです:
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
attempts INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
-- 部分インデックス:未処理の行のみをインデックス化し、行が完了としてマークされるにつれて小さく保つ
CREATE INDEX idx_outbox_unprocessed
ON outbox_events (created_at)
WHERE processed_at IS NULL;
created_at WHERE processed_at IS NULLに対する部分インデックスは重要です。これがないと、インデックスは書き込まれたすべてのイベントとともに成長し、リレーのポーリングクエリは時間とともに遅くなります。これがあることで、インデックスは保留中の行のみをカバーし、ステディステートでは、何千ものイベントがパブリッシュされていても、小さく有界なセットになります。
主要なフィールドの選択:
aggregate_typeおよびaggregate_idは、イベントがどのエンティティに属するかを記述します。順序保証およびルーティングに有用です。event_typeは、コンシューマーが期待するイベント名です。payload JSONBはイベントボディを格納します。必要に応じてクエリできるように、TEXTではなくJSONBを使用します。attemptsは、リレーがその行のパブリッシュを試行した回数を追跡します。リトライ制限やデッドレター処理に使用されます。processed_atは、保留中の行ではNULLであり、リレーが正常にパブリッシュしたときに設定されます。
1つのトランザクション内でビジネスデータとアウトボックスイベントを書き込む
ビジネスロジックは、両方のレコードを単一のBeginTx / Commit呼び出し内で書き込みます。ここではパブリッシュ呼び出しはありません。データベース書き込みのみです。
type OrderService struct {
db *sql.DB
}
func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx, `
INSERT INTO orders (id, customer_id, total, created_at)
VALUES ($1, $2, $3, NOW())
`, order.ID, order.CustomerID, order.Total); err != nil {
return fmt.Errorf("insert order: %w", err)
}
payload, err := json.Marshal(map[string]any{
"order_id": order.ID,
"customer_id": order.CustomerID,
"total": order.Total,
})
if err != nil {
return fmt.Errorf("marshal payload: %w", err)
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO outbox_events
(aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4)
`, "order", order.ID, "order.created", payload); err != nil {
return fmt.Errorf("insert outbox event: %w", err)
}
return tx.Commit()
}
tx.Commit()が失敗した場合、注文行もアウトボックス行も永続化されません。成功した場合、両方がデータベースにあることが保証されます。リレーは、その後いつでもイベントをパブリッシュできます。直ちに、1秒後、またはクラッシュ後のリレー再起動後などです。
これがビジネスレイヤーで必要な唯一のコード変更です。パターンの残りはリレーにあります。
Goによるリレーの実装
リレーは、タイマーでアウトボックステーブルをポーリングするバックグラウンドワーカーです。未処理の行のバッチを取得し、それぞれをパブリッシュし、完了としてマークします。同じバイナリ内でアプリケーションと同じに保つか、別プロセスとして実行するか、どちらでも機能しますが、同じバイナリの方が運用が簡単です。
type OutboxRelay struct {
db *sql.DB
publisher Publisher
logger *slog.Logger
batchSize int
pollInterval time.Duration
maxAttempts int
}
func (r *OutboxRelay) Run(ctx context.Context) error {
ticker := time.NewTicker(r.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := r.processBatch(ctx); err != nil {
r.logger.Error("outbox relay batch failed", "err", err)
}
}
}
}
リレーはコンテキストのキャンセルを尊重するため、グラシアルシャットダウンとの統合が容易になります。コンテキストのライフタイムとキャンセルパターンに関する詳細な解説については、Go context.Context Done Rightを参照してください。
FOR UPDATE SKIP LOCKED: 並行ワーカーパターン
processBatch関数は、並行リレーワーカーを安全に処理するためにFOR UPDATE SKIP LOCKEDを使用します:
func (r *OutboxRelay) processBatch(ctx context.Context) error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
rows, err := tx.QueryContext(ctx, `
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
AND attempts < $1
ORDER BY created_at
LIMIT $2
FOR UPDATE SKIP LOCKED
`, r.maxAttempts, r.batchSize)
if err != nil {
return fmt.Errorf("query outbox: %w", err)
}
defer rows.Close()
type row struct {
id string
aggregateType string
aggregateID string
eventType string
payload json.RawMessage
}
var batch []row
for rows.Next() {
var e row
if err := rows.Scan(
&e.id, &e.aggregateType, &e.aggregateID, &e.eventType, &e.payload,
); err != nil {
return fmt.Errorf("scan row: %w", err)
}
batch = append(batch, e)
}
if err := rows.Err(); err != nil {
return err
}
for _, e := range batch {
if err := r.publisher.Publish(ctx, e.eventType, e.aggregateID, e.payload); err != nil {
r.logger.Error("publish failed", "event_id", e.id, "err", err)
if _, err := tx.ExecContext(ctx,
`UPDATE outbox_events SET attempts = attempts + 1 WHERE id = $1`, e.id,
); err != nil {
r.logger.Error("increment attempts failed", "event_id", e.id, "err", err)
}
continue
}
if _, err := tx.ExecContext(ctx,
`UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`, e.id,
); err != nil {
return fmt.Errorf("mark processed: %w", err)
}
}
return tx.Commit()
}
FOR UPDATE SKIP LOCKEDは2つのことをします。まず、FOR UPDATEはトランザクションの期間中、選択された行をロックし、他のトランザクションがそれらを選択するのを防ぎます。次に、SKIP LOCKEDは、行がすでに別のトランザクションによってロックされている場合、クエリが待つのではなく、それをスキップすることを意味します。その結果、複数のリレーワーカーが並行して実行でき、それぞれが重複しない行のサブセットを取得します。
SKIP LOCKEDがない場合、2番目のワーカーは最初のトランザクションがコミットするまでブロックされ、同じ行を見ることになります。その時点では、それらはすでに完了としてマークされています。SKIP LOCKEDがある場合、2番目のワーカーは待つ代わりに直ちに異なる行を取得し、安全な水平スケーリングを提供します。
上記のコードにおけるスキャンとパブリッシュの分離に注意してください。すべての行はパブリッシュループが開始する前にスライスにスキャンされます。これにより、ブローカーへのネットワーク呼び出しを跨いで開いた*sql.Rowsカーソルを保持し、トランザクションを必要以上に開いたままにすることを回避します。
冪等性と重複排除
リレーは少なくとも1回パブリッシュします。イベントをパブリッシュした後、processed_atの更新をコミットする前にクラッシュした場合、再起動時に同じイベントを再度パブリッシュします。これは避けることができません。分散トランザクションコーディネータなしでデータベースとメッセージブローカー間で厳密に1回配信することは、このトレードオフを必要とします。
コンシューマーは冪等である必要があります。最もシンプルなアプローチは、processed_eventsテーブルで処理されたイベントIDを追跡することです:
CREATE TABLE processed_events (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
func (h *OrderHandler) HandleOrderCreated(ctx context.Context, eventID string, payload []byte) error {
// イベントIDを自然キーとして使用して重複を排除
_, err := h.db.ExecContext(ctx, `
INSERT INTO processed_events (event_id) VALUES ($1)
ON CONFLICT (event_id) DO NOTHING
`, eventID)
if err != nil {
return fmt.Errorf("dedup check: %w", err)
}
// インサートが実際に発生したか(1行)または何もしなかったか(0行)を確認
// よりシンプルなアプローチ:RETURNINGを使用するか、影響を受けた行をチェック
// 0行が影響を受けた場合、これは重複です -- スキップ
...
}
実際には、多くのチームはブローカー自身の重複排除ヘッダー(Kafkaのログ圧縮トピックのkeyフィールドや、RabbitMQのmessage-idヘッダーなど)に依存し、データベースレベルの重複排除をフォールバックとして扱います。どちらも適用する有効なレイヤーです。
パブリッシュされたメッセージに、重複排除キーとしてアウトボックスイベントid(UUID)を含めます。コンシューマーは、どの重複排除メカニズムを好むかに関わらず、それを使用できます。
リトライポリシーとポイズンメッセージ
attemptsカラムがリトライポリシーを駆動します。リレーはattempts >= maxAttemptsの行をスキップし、それらの行をデッドレターとして扱います。別のプロセスまたはオペレーターアラートがそれらを処理します。
シンプルなデッドレタービュー:
CREATE VIEW outbox_dead_letters AS
SELECT *
FROM outbox_events
WHERE attempts >= 5
AND processed_at IS NULL
ORDER BY created_at;
良い本番用リトライポリシー:
- リトライのコストに応じて
maxAttemptsを5-10に設定します。 - 指数バックオフを検討します:
retry_afterカラムを含め、retry_after > NOW()の行をスキップします。 COUNT(*) FROM outbox_dead_lettersが閾値を超えた場合にアラートを出します。- 手動リトライパスを提供します:特定の行に対して
attempts = 0およびretry_after = NULLをリセットする管理エンドポイントまたはスクリプトです。
ポイズンメッセージ(コンシューマーのバグやスキーマの不一致により一貫して失敗する行)は、健全なメッセージをブロックすべきではありません。リレーは1ティックごとにバッチを処理し、失敗をキューから削除するのではなく、試行回数を増やしてマークするため、健全な行は通常通り進行し、毒された行はデッドレター閾値に達するまで試行回数を蓄積します。
イベントの順序付けとパーティショニング
ポーリングクエリはcreated_atで順序付けされており、バッチ内で先入先出(FIFO)の順序付けを提供します。多くのユースケースではこれで十分です。厳密なエンティティごとの順序付けが必要な場合(例えば、同じ注文に対してorder.updatedがorder.createdよりも前にパブリッシュされないことを保証する)、エンティティごとの順序付けが必要です。
aggregate_idをORDER BY句に追加し、Apache Kafkaのようなパーティション化されたトピックにパブリッシュする際のメッセージキーとして使用します。Kafkaは同じキーを持つすべてのメッセージを同じパーティションにルーティングし、パーティションは順序立てて消費されます。これにより、グローバルな順序付け(単一のリレーインスタンスが必要)なしに、エンティティごとの順序付け保証が得られます。
ORDER BY aggregate_id, created_at
パーティション化された順序付けをサポートしないブローカー(基本的なAMQPキューなど)の場合、単一インスタンスリレーまたはコンシューマー内のアプリケーションレベルの順序付けチェックが実用的な代替手段です。
LISTEN/NOTIFYでポーリングレイテンシを削減する
1秒のポーリングインターバルは、平均500ミリ秒のイベントレイテンシを意味します。多くのワークロードではこれで問題ありません。近ゼロレイテンシが必要なケースでは、PostgreSQLのLISTEN/NOTIFYメカニズムにより、新しいアウトボックス行が挿入されるとすぐにリレーが目覚めることができます。
アウトボックステーブルにトリガーを追加します:
CREATE OR REPLACE FUNCTION notify_outbox_insert() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('outbox_event', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER outbox_insert_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox_insert();
リレーでは、チャンネルを監視し、通知で目覚め、定期的にポーリングするフォールバックを維持します:
func (r *OutboxRelay) Run(ctx context.Context) error {
listener := pq.NewListener(r.dsn, 10*time.Second, time.Minute, nil)
defer listener.Close()
if err := listener.Listen("outbox_event"); err != nil {
return fmt.Errorf("listen: %w", err)
}
ticker := time.NewTicker(5 * time.Second) // fallback poll
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-listener.Notify:
if err := r.processBatch(ctx); err != nil {
r.logger.Error("outbox batch failed (notify)", "err", err)
}
case <-ticker.C:
if err := r.processBatch(ctx); err != nil {
r.logger.Error("outbox batch failed (poll)", "err", err)
}
}
}
}
フォールバックタイマーは、リレー再起動中やネットワークの瞬断中に逃された通知を処理します。フォールバックインターバルはミリ秒ではなく数秒に保ちます。その役割は低レイテンシではなく、回復です。
可観測性: メトリクス、ログ、アラート
アウトボックスはインフラストラクチャです。インフラストラクチャとして扱い、適切に計装します。
主要なメトリクス:
var (
outboxPublished = prometheus.NewCounter(prometheus.CounterOpts{
Name: "outbox_events_published_total",
Help: "Total outbox events successfully published.",
})
outboxFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "outbox_events_failed_total",
Help: "Total outbox publish failures by event type.",
}, []string{"event_type"})
outboxPending = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "outbox_events_pending",
Help: "Current number of unprocessed outbox events.",
})
outboxBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "outbox_batch_duration_seconds",
Help: "Duration of each outbox processing batch.",
Buckets: prometheus.DefBuckets,
})
)
ゲージの更新: outbox_events_pendingを正確に保つために、定期的なクエリを実行します:
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;
検討すべきアラート閾値:
outbox_events_pending > 1000が2分以上持続する場合:リレーが遅れているか、停止している。outbox_events_pendingが単調に増加する場合:ブローカーがダウンしているか、リレーがクラッシュしている。- デッドレター数がゼロでない場合:スキーマまたはコンシューマーのバグの調査が必要。
outbox_batch_duration_seconds p95 > 5s:データベースが遅いか、バッチサイズが大きすぎる。
構造化ログフィールド: リレーからのすべてのログ行にevent_id、event_type、aggregate_id、およびattemptを含めます。これらのフィールドにより、失敗したパブリッシュを特定のアウトボックス行および下流のコンシューマートレースと相関付けることができます。
アウトボックス vs 直接キュー vs サガ
アウトボックスパターンは、すべての協調問題に対する正しいツールではありません。比較は以下の通りです:
| アプローチ | 原子性 | 複雑さ | 使用タイミング |
|---|---|---|---|
| 直接パブリッシュ | なし | 低 | イベントの偶発的な損失が許容される場合 |
| トランザクショナルアウトボックス | 強い | 中 | 単一サービスからの信頼できるイベント配信 |
| サガパターン | 最終的 | 高 | 複数のデータベースにわたるマルチサービストランザクション |
| 2フェーズコミット | 強い | とても高 | 実用的なケースは稀; 多くの分散システムで回避される |
アウトボックスパターンは、単一サービスが自らの状態変更を反映するイベントを信頼して放出することを保証します。複数のサービス間で状態変更を協調することはありません。それがサガパターンの役割です。ブローカーの選択(RabbitMQ, SQS、またはKafka)は、アウトボックスパターン自体とは独立しています。リレーは、システムで使用しているブローカーにパブリッシュします。
サガを構築している場合でも、アウトボックスパターンは依然として有用です。サガの各参加者は、アウトボックスを使用して、ローカル状態変更とサガイベントを1つのトランザクションで書き込み、その後サガオーケストレーターまたは振付がそれらのイベントを信頼して読み取ります。
WALベースのCDCによる代替リレー
ポーリングの代わりに、PostgreSQLのWrite-Ahead Log(WAL)をテールし、レプリケーションストリームから直接アウトボックスインサートを読み取ることができます。Debeziumなどのツールがこれを行います。利点は、低いレイテンシとアウトボックステーブルへのロック圧力のなさです。欠点は、運用の複雑さ、専用のPostgreSQLレプリケーションスロット、および実行および監視が必要な外部サービスです。
多くのチームにとって、上記のポーリングリレーが正しい出発点です。WALテールは、高いアウトボックスインサートレート(1秒あたり数万件)、100ms未満のイベントレイテンシの必要性、または他の変更キャプチャニーズのためにすでにDebeziumを実行している場合に意味を持ちます。
sqlcとの統合
型安全なGoデータベースコードにsqlcを使用する場合、アウトボックスクエリは自然に適合します:
-- name: InsertOutboxEvent :exec
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (@aggregate_type, @aggregate_id, @event_type, @payload);
-- name: FetchOutboxBatch :many
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
AND attempts < @max_attempts
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED;
-- name: MarkOutboxProcessed :exec
UPDATE outbox_events SET processed_at = NOW() WHERE id = @id;
-- name: IncrementOutboxAttempts :exec
UPDATE outbox_events SET attempts = attempts + 1 WHERE id = @id;
-- name: OutboxPendingCount :one
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;
sqlcは各クエリに対して型安全な関数を生成し、文字列補完エラーを回避し、アウトボックスクエリロジックをデータベースアクセスレイヤーの残りと共に配置します。
本番環境チェックリスト
アウトボックス実装を出荷する前にこれを使用してください:
データベース
- アウトボックステーブルに
created_at WHERE processed_at IS NULLの部分インデックスがある - デフォルト値が0の
attemptsカラムが存在する - デッドレタービューまたはクエリが定義されている
- 古い処理済み行は定期的にアーカイブまたは削除される(夜間クリーンアップジョブで十分)
リレー
- ポーリングクエリで
FOR UPDATE SKIP LOCKEDが使用されている - リレーがトランザクション内で実行される(クエリ前に開始、すべての更新後にコミット)
- バッチサイズが有界である(50-200行が一般的)
- リレーがグラシアルシャットダウンのためにコンテキストキャンセルを尊重する
- 失敗したパブリッシュがバッチを中止するのではなく、
attemptsを増加させる
冪等性
- パブリッシュされたメッセージに、重複排除キーとしてアウトボックス
idが含まれている - コンシューマーが冪等であるか、ブローカーが重複排除を提供している
- 重複排除パターンについては分散システムにおける冪等性を参照
可観測性
-
outbox_events_pendingゲージが監視およびアラート対象である - デッドレター数がアラート対象である
- リレーバッチの継続時間が追跡されている
- 構造化ログに
event_id、event_type、およびaggregate_idが含まれている
運用
- デッドレター行の手動リトライパスが存在する
- リレー再起動動作がテストされている(正しく再パブリッシュされるか?)
- ブローカーアウトエージ動作がテストされている(アウトボックスは正しく成長し、ドレインされるか?)
最終的な考え
デュアルライト問題は、インシデントを引き起こすまでエッジケースとして容易に軽視されがちです。トランザクショナルアウトボックスパターンは、あなたがすでに持っているツールでそれを解決します。PostgreSQLトランザクション、バックグラウンドゴルーチン、および追加の1つのテーブルです。リレーは構築が簡単で、運用が簡単で、理屈が容易です。
そのコストは、コンシューマーが少なくとも1回配信のために設計される必要があるということです。それは合理的なトレードオフです。分散トランザクションなしでデータベースとブローカー間で厳密に1回配信することは、実際には達成できません。そして、そうでないふりは、失敗条件下でサイレントにイベントをドロップまたは二重処理するシステムにつながります。
データとともにイベントを書き込み、信頼してリレーし、コンシューマーを冪等にする。それがこのパターンのすべてです。
この記事は運用環境でのアプリアーキテクチャクラスターの一部です。