Transactional Outbox Pattern in Go mit PostgreSQL

Schreiben Sie das Ereignis mit den Daten. Trennen Sie diese niemals.

Inhaltsverzeichnis

Zwei Schreibvorgänge, die gemeinsam erfolgreich sein sollten, werden schließlich separat scheitern. Ihr Bestellservice speichert die Bestellung in der Datenbank und veröffentlicht anschließend ein order.created-Ereignis (Event) bei einem Message Broker.

Diese beiden Operationen nacheinander ausgeführt.

Dazwischen kann etwas schiefgehen: Der Broker ist ausgefallen, das Netzwerk timeoutt, der Prozess wird neu gestartet oder der Container wird vertrieben. Der Datenbank-Schreibvorgang war erfolgreich. Die Veröffentlichung (Publish) war es nicht. Der nachgelagerte Service, der über die neue Bestellung informiert werden muss, erfährt nie davon. Niemand hat es bemerkt, bis ein Kunde angerufen hat.

Dies ist das Dual-Write-Problem (Dual-Schreibproblem), und es ist eine der häufigsten Ursachen für stille Datenverluste in verteilten Systemen. Das Muster der transaktionalen Outbox (Transactional Outbox Pattern) ist die Standardlösung.

Transaktionales Outbox-Muster – Ereignis und Daten werden gemeinsam geschrieben

Das Dual-Write-Problem

Das Fehlermodell ist leicht nachvollziehbar, sobald man es sieht:

BEGIN;
  INSERT INTO orders ...   -- erfolgreich
COMMIT;

PUBLISH order.created ...  -- fehlschlägt, stürzt ab oder wird nie erreicht

Die Datenbank und der Message Broker teilen keine Transaktionsgrenze. Es gibt keinen Rollback, der beide abdeckt. Jeder Service, der save -> publish nacheinander ausführt, hat diese Lücke. Das Muster zeigt sich in vielen Formen:

  • db.Save(order) gefolgt von events.Publish(OrderCreated{...})
  • HTTP-Handler, der eine Transaktion committet und dann einen externen Webhook aufruft
  • Worker, der einen Datensatz aus einer Warteschlange verarbeitet und Ergebnisse in eine andere schreibt

Das Ergebnis ist in allen Fällen gleich: Die eine Seite schlägt zu, die andere scheitert, und das System endet in einem Zustand, der für das Monitoring unsichtbar ist, weil beide einzelnen Operationen irgendwann Erfolg gemeldet haben.

Eine Retry-Schleife behebt dies nicht. Das erneute Versuchen der Veröffentlichung nach dem Datenbank-Commit funktioniert nur, wenn der Retry selbst zuverlässig ist – was genau die Dauerhaftigkeitsgarantie erfordert, die Sie nicht haben.

Was das transaktionale Outbox-Muster macht

Das Outbox-Muster eliminiert die Lücke, indem es die direkte Veröffentlichung vollständig entfernt. Anstatt den Broker innerhalb Ihrer Geschäftslogik aufzurufen, schreiben Sie einen Ereignisdatensatz in eine outbox-Tabelle in derselben Datenbanktransaktion wie die Geschäftsdaten. Ein separater Hintergrundprozess – der Relay – liest aus der Outbox-Tabelle und veröffentlicht beim Broker.

BEGIN;
  INSERT INTO orders ...         -- Geschäftsdaten
  INSERT INTO outbox_events ...  -- Ereignisdatensatz
COMMIT;

-- Relay-Prozess (separat):
SELECT ... FROM outbox_events FOR UPDATE SKIP LOCKED;
PUBLISH order.created ...
UPDATE outbox_events SET processed_at = NOW() WHERE id = $1;

Entweder beide Schreibvorgänge schlagen zu oder beide scheitern. Die Transaktionsgarantie, die Sie bereits von PostgreSQL haben, deckt nun auch den Ereignisdatensatz ab. Der Relay kann die Veröffentlichung so oft wiederholen, wie nötig, da das Ereignis in einem dauerhaften Speicher liegt. Wenn der Relay unterwegs abstürzt, startet er neu und versucht es erneut. Das schlimmste Ergebnis ist, dass das Ereignis mehr als einmal veröffentlicht wird – was durch die Idempotenz der Konsumenten behandelt wird (siehe Idempotenz in verteilten Systemen).

PostgreSQL-Schema für die Outbox-Tabelle

Das Schema ist absichtlich einfach:

CREATE TABLE outbox_events (
    id             UUID         PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id   VARCHAR(100) NOT NULL,
    event_type     VARCHAR(100) NOT NULL,
    payload        JSONB        NOT NULL,
    attempts       INT          NOT NULL DEFAULT 0,
    created_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    processed_at   TIMESTAMPTZ
);

-- Partieller Index: indiziert nur unverarbeitete Zeilen, bleibt klein, da Zeilen als erledigt markiert werden
CREATE INDEX idx_outbox_unprocessed
    ON outbox_events (created_at)
    WHERE processed_at IS NULL;

Der partielle Index auf created_at WHERE processed_at IS NULL ist wichtig. Ohne ihn wächst der Index mit jedem jemals geschriebenen Ereignis und die Polling-Abfrage des Relays wird mit der Zeit langsamer. Mit ihm deckt der Index nur die ausstehenden Zeilen ab, was im stabilen Zustand eine kleine, begrenzte Menge ist, unabhängig davon, wie viele Ereignisse veröffentlicht wurden.

Wichtige Feldauswahlen:

  • aggregate_type und aggregate_id beschreiben, welcher Entität das Ereignis angehört. Nützlich für Ordnungsgarantien und Routing.
  • event_type ist der Ereignisname, den Ihre Konsumenten erwarten.
  • payload JSONB speichert den Ereigniskörper. Verwenden Sie JSONB anstelle von TEXT, damit Sie es bei Bedarf abfragen können.
  • attempts verfolgt, wie oft der Relay versucht hat, diese Zeile zu veröffentlichen. Wird für Retry-Limits und Dead-Letter-Handling verwendet.
  • processed_at ist NULL für ausstehende Zeilen und wird gesetzt, wenn der Relay erfolgreich veröffentlicht.

Schreiben von Geschäftsdaten und Outbox-Ereignis in einer Transaktion

Die Geschäftslogik schreibt beide Datensätze innerhalb eines einzelnen BeginTx / Commit-Aufrufs. Hier gibt es keinen Publish-Aufruf – nur Datenbank-Schreibvorgänge.

type OrderService struct {
    db *sql.DB
}

func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO orders (id, customer_id, total, created_at)
        VALUES ($1, $2, $3, NOW())
    `, order.ID, order.CustomerID, order.Total); err != nil {
        return fmt.Errorf("insert order: %w", err)
    }

    payload, err := json.Marshal(map[string]any{
        "order_id":    order.ID,
        "customer_id": order.CustomerID,
        "total":       order.Total,
    })
    if err != nil {
        return fmt.Errorf("marshal payload: %w", err)
    }

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO outbox_events
            (aggregate_type, aggregate_id, event_type, payload)
        VALUES ($1, $2, $3, $4)
    `, "order", order.ID, "order.created", payload); err != nil {
        return fmt.Errorf("insert outbox event: %w", err)
    }

    return tx.Commit()
}

Wenn tx.Commit() fehlschlägt, wird weder die Bestellzeile noch die Outbox-Zeile persistiert. Wenn es erfolgreich ist, ist garantiert, dass beide in der Datenbank sind. Der Relay kann das Ereignis zu jedem Zeitpunkt danach veröffentlichen – sofort, in einer Sekunde oder nach dem Neustart des Relays nach einem Absturz.

Dies ist die einzige Codeänderung, die in Ihrer Geschäftsschicht erforderlich ist. Der Rest des Musters lebt im Relay.

Go-Relay-Implementierung

Der Relay ist ein Hintergrundworker, der die Outbox-Tabelle nach einem Zeitplan abfragt. Erholt eine Batch von unverarbeiteten Zeilen, veröffentlicht jede davon und markiert sie als erledigt. Behalten Sie es im selben Binary wie Ihre Anwendung oder führen Sie es als separaten Prozess aus – beides funktioniert, aber das gleiche Binary ist einfacher zu betreiben.

type OutboxRelay struct {
    db          *sql.DB
    publisher   Publisher
    logger      *slog.Logger
    batchSize   int
    pollInterval time.Duration
    maxAttempts  int
}

func (r *OutboxRelay) Run(ctx context.Context) error {
    ticker := time.NewTicker(r.pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox relay batch failed", "err", err)
            }
        }
    }
}

Der Relay respektiert die Kontext-Absage, was es einfach macht, ihn mit einem graceful Shutdown zu integrieren. Für eine detaillierte Behandlung von Kontext-Lebensdauer und Absagemustern siehe Go context.Context Done Right.

FOR UPDATE SKIP LOCKED: Das Pattern für koncurrente Worker

Die processBatch-Funktion verwendet FOR UPDATE SKIP LOCKED, um koncurrente Relay-Worker sicher zu handhaben:

func (r *OutboxRelay) processBatch(ctx context.Context) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    rows, err := tx.QueryContext(ctx, `
        SELECT id, aggregate_type, aggregate_id, event_type, payload
        FROM outbox_events
        WHERE processed_at IS NULL
          AND attempts < $1
        ORDER BY created_at
        LIMIT $2
        FOR UPDATE SKIP LOCKED
    `, r.maxAttempts, r.batchSize)
    if err != nil {
        return fmt.Errorf("query outbox: %w", err)
    }
    defer rows.Close()

    type row struct {
        id            string
        aggregateType string
        aggregateID   string
        eventType     string
        payload       json.RawMessage
    }

    var batch []row
    for rows.Next() {
        var e row
        if err := rows.Scan(
            &e.id, &e.aggregateType, &e.aggregateID, &e.eventType, &e.payload,
        ); err != nil {
            return fmt.Errorf("scan row: %w", err)
        }
        batch = append(batch, e)
    }
    if err := rows.Err(); err != nil {
        return err
    }

    for _, e := range batch {
        if err := r.publisher.Publish(ctx, e.eventType, e.aggregateID, e.payload); err != nil {
            r.logger.Error("publish failed", "event_id", e.id, "err", err)
            if _, err := tx.ExecContext(ctx,
                `UPDATE outbox_events SET attempts = attempts + 1 WHERE id = $1`, e.id,
            ); err != nil {
                r.logger.Error("increment attempts failed", "event_id", e.id, "err", err)
            }
            continue
        }

        if _, err := tx.ExecContext(ctx,
            `UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`, e.id,
        ); err != nil {
            return fmt.Errorf("mark processed: %w", err)
        }
    }

    return tx.Commit()
}

FOR UPDATE SKIP LOCKED macht zwei Dinge. Erstens, FOR UPDATE sperrt die ausgewählten Zeilen für die Dauer der Transaktion und verhindert, dass eine andere Transaktion sie auswählt. Zweitens, SKIP LOCKED bedeutet, dass, wenn eine Zeile bereits von einer anderen Transaktion gesperrt ist, die Abfrage sie überspringt, anstatt zu warten. Das Ergebnis ist, dass mehrere Relay-Worker parallel laufen können und jeder einen nicht überlappenden Teil der Zeilen aufnimmt.

Ohne SKIP LOCKED würde ein zweiter Worker blockiert werden, bis die erste Transaktion committet, bevor er dieselben Zeilen sieht – zu diesem Zeitpunkt wären sie bereits als erledigt markiert. Mit SKIP LOCKED nimmt der zweite Worker sofort andere Zeilen auf, anstatt zu warten, was Ihnen eine sichere horizontale Skalierung bietet.

Beachten Sie die Trennung von Scan-und-Publish im obigen Code: Alle Zeilen werden in einen Slice gescannt, bevor die Publish-Schleife beginnt. Dies vermeidet, dass ein offenes *sql.Rows-Cursor über Netzwerkaufrufe zum Broker hinweg gehalten wird, was die Transaktion länger offen halten würde als notwendig.

Idempotenz und Deduplizierung

Der Relay veröffentlicht mindestens einmal. Wenn er ein Ereignis veröffentlicht und dann abstürzt, bevor er die processed_at-Aktualisierung committet, wird er beim Neustart dasselbe Ereignis erneut veröffentlichen. Dies ist unvermeidbar – Exactly-Once-Delivery über eine Datenbank und einen Message Broker ohne einen verteilten Transaktionskoordinator erfordert diesen Trade-off.

Konsumenten müssen idempotent sein. Der einfachste Ansatz ist, verarbeitete Ereignis-IDs in einer processed_events-Tabelle zu verfolgen:

CREATE TABLE processed_events (
    event_id   UUID PRIMARY KEY,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
func (h *OrderHandler) HandleOrderCreated(ctx context.Context, eventID string, payload []byte) error {
    // Deduplizierung unter Verwendung der Ereignis-ID als natürlicher Schlüssel
    _, err := h.db.ExecContext(ctx, `
        INSERT INTO processed_events (event_id) VALUES ($1)
        ON CONFLICT (event_id) DO NOTHING
    `, eventID)
    if err != nil {
        return fmt.Errorf("dedup check: %w", err)
    }

    // Prüfen, ob das Einfügen tatsächlich stattgefunden hat (1 Zeile) oder ein No-Op war (0 Zeilen)
    // Ein einfacherer Ansatz: RETURNING verwenden oder betroffene Zeilen prüfen
    // Wenn 0 Zeilen betroffen, ist dies ein Duplikat – überspringen
    ...
}

In der Praxis verlassen sich viele Teams auf die eigenen Deduplizierungs-Header des Brokers (wie das key-Feld von Kafka für log-kompaktierte Topics oder den message-id-Header von RabbitMQ) und behandeln die Deduplizierung auf Datenbankebene als Fallback. Beide sind gültige Schichten, die angewendet werden können.

Schließen Sie die Outbox-Ereignis-id (eine UUID) in die veröffentlichte Nachricht als Deduplizierungsschlüssel ein. Konsumenten können ihn dann unabhängig von ihrer bevorzugten Deduplizierungsmechanik verwenden.

Retry-Richtlinie und Poison Messages

Die Spalte attempts steuert die Retry-Richtlinie. Der Relay überspringt Zeilen, bei denen attempts >= maxAttempts gilt, und behandelt diese Zeilen als Dead Letters. Ein separater Prozess oder Operator-Alert handhabt sie.

Eine einfache Dead-Letter-Ansicht:

CREATE VIEW outbox_dead_letters AS
SELECT *
FROM outbox_events
WHERE attempts >= 5
  AND processed_at IS NULL
ORDER BY created_at;

Eine gute Produktions-Retry-Richtlinie:

  • Setzen Sie maxAttempts auf 5-10, je nachdem, wie teuer Retries sind.
  • Erwägen Sie exponentiellen Backoff: Schließen Sie eine retry_after-Spalte ein und überspringen Sie Zeilen, bei denen retry_after > NOW() gilt.
  • Alarmieren Sie, wenn COUNT(*) FROM outbox_dead_letters eine Schwelle überschreitet.
  • Bieten Sie einen manuellen Retry-Pfad an: Einen Admin-Endpunkt oder Skript, der attempts = 0 und retry_after = NULL für bestimmte Zeilen zurücksetzt.

Poison Messages – Zeilen, die aufgrund eines Bugs im Konsumenten oder einer Schema-Ungleichheit konsistent fehlschlagen – sollten gesunde Nachrichten nicht blockieren. Da der Relay pro Tick eine Batch verarbeitet und Fehler mit einer Erhöhung der Versuche markiert, anstatt sie aus der Warteschlange zu entfernen, gehen gesunde Zeilen normal weiter, während vergiftete versuchen, bis sie die Dead-Letter-Schwelle erreichen.

Ereignisreihenfolge und Partitionierung

Die Polling-Abfrage bestellt nach created_at, was First-In-First-Out-Reihenfolge innerhalb einer Batch gibt. Für die meisten Anwendungsfälle reicht das aus. Wenn strikte Reihenfolge pro Entität wichtig ist – zum Beispiel, um sicherzustellen, dass order.updated nie vor order.created für dieselbe Bestellung veröffentlicht wird – benötigen Sie pro-Aggregat-Reihenfolge.

Fügen Sie aggregate_id zur ORDER BY-Klausel hinzu und verwenden Sie es als Message-Key beim Veröffentlichen in einem partitionierten Topic wie Apache Kafka. Kafka leitet alle Nachrichten mit demselben Schlüssel an dieselbe Partition weiter, und Partitionen werden in Reihenfolge konsumiert. Dies gibt Ihnen pro-Aggregat-Reihenfolgegarantien ohne globale Reihenfolge, die eine einzelne Relay-Instanz erfordern würde.

ORDER BY aggregate_id, created_at

Für Broker, die keine partitionierte Reihenfolge unterstützen (wie grundlegende AMQP-Warteschlangen), sind einzelne Relay-Instanzen oder anwendungsbasierte Reihenfolgeprüfungen im Konsumenten die praktischen Alternativen.

Reduzieren der Polling-Latenz mit LISTEN/NOTIFY

Ein Polling-Intervall von einer Sekunde bedeutet eine durchschnittliche Ereignislatenz von 500 Millisekunden. Für die meisten Workloads ist das in Ordnung. Für Fälle, in denen Sie eine nahezu null Latenz benötigen, lässt der LISTEN/NOTIFY-Mechanismus von PostgreSQL den Relay sofort aufwachen, wenn eine neue Outbox-Zeile eingefügt wird.

Fügen Sie einen Trigger zur Outbox-Tabelle hinzu:

CREATE OR REPLACE FUNCTION notify_outbox_insert() RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify('outbox_event', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_insert_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox_insert();

Im Relay hören Sie auf den Kanal und wachen auf Benachrichtigungen auf, während Sie auf periodisches Polling zurückgreifen:

func (r *OutboxRelay) Run(ctx context.Context) error {
    listener := pq.NewListener(r.dsn, 10*time.Second, time.Minute, nil)
    defer listener.Close()

    if err := listener.Listen("outbox_event"); err != nil {
        return fmt.Errorf("listen: %w", err)
    }

    ticker := time.NewTicker(5 * time.Second) // Fallback-Poll
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-listener.Notify:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (notify)", "err", err)
            }
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (poll)", "err", err)
            }
        }
    }
}

Der Fallback-Ticker handhabt alle Benachrichtigungen, die während eines Relay-Neustarts oder eines Netzwerkproblems verpasst wurden. Halten Sie das Fallback-Intervall auf einige Sekunden statt Millisekunden – seine Aufgabe ist die Erholung, nicht niedrige Latenz.

Observability: Metriken, Logs und Alerts

Die Outbox ist Infrastruktur. Behandeln Sie sie wie Infrastruktur und instrumentieren Sie sie entsprechend.

Wichtige Metriken:

var (
    outboxPublished = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "outbox_events_published_total",
        Help: "Total outbox events successfully published.",
    })
    outboxFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
        Name: "outbox_events_failed_total",
        Help: "Total outbox publish failures by event type.",
    }, []string{"event_type"})
    outboxPending = prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "outbox_events_pending",
        Help: "Current number of unprocessed outbox events.",
    })
    outboxBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name:    "outbox_batch_duration_seconds",
        Help:    "Duration of each outbox processing batch.",
        Buckets: prometheus.DefBuckets,
    })
)

Gauge-Refresh: Führen Sie eine periodische Abfrage aus, um outbox_events_pending genau zu halten:

SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

Zu berücksichtigende Alarm-Schwellenwerte:

  • outbox_events_pending > 1000 für mehr als zwei Minuten: Relay hinkt hinterher oder ist steckengeblieben.
  • outbox_events_pending wächst monoton: Broker ist ausgefallen oder Relay ist abgestürzt.
  • Dead-Letter-Anzahl ungleich null: Schema- oder Konsumenten-Bug muss untersucht werden.
  • outbox_batch_duration_seconds p95 > 5s: Datenbank ist langsam oder Batch-Größe ist zu groß.

Strukturierte Log-Felder: Schließen Sie event_id, event_type, aggregate_id und attempt in jede Logzeile vom Relay ein. Diese Felder ermöglichen es Ihnen, eine fehlgeschlagene Veröffentlichung mit der spezifischen Outbox-Zeile und dem nachgelagerten Konsumenten-Trace zu korrelieren.

Outbox vs. direkte Warteschlange vs. Saga

Das Outbox-Muster ist nicht das richtige Werkzeug für jedes Koordinationsproblem. Hier ist der Vergleich:

Ansatz Atomicität Komplexität Wann verwenden
Direkte Veröffentlichung Keine Niedrig Akzeptabel, gelegentlich Ereignisse zu verlieren
Transaktionale Outbox Stark Mittel Zuverlässige Ereignislieferung von einem einzelnen Service
Saga-Muster Eventuell Hoch Multi-Service-Transaktionen, die mehrere Datenbanken überspannen
Two-Phase Commit Stark Sehr hoch Selten praktisch; in den meisten verteilten Systemen vermieden

Das Outbox-Muster garantiert, dass ein einzelner Service zuverlässig Ereignisse emittiert, die seine eigenen Zustandsänderungen widerspiegeln. Es koordiniert keine Zustandsänderungen über mehrere Services hinweg – dafür ist das Saga-Muster da. Die Wahl des Brokers – ob RabbitMQ, SQS oder Kafka – ist unabhängig vom Outbox-Muster selbst; der Relay veröffentlicht an den Broker, den Ihr System verwendet.

Wenn Sie eine Saga bauen, ist das Outbox-Muster dennoch nützlich: Jeder Teilnehmer in der Saga schreibt seine lokale Zustandsänderung und sein Saga-Ereignis in einer Transaktion unter Verwendung der Outbox, und dann liest der Saga-Orchestrator oder die Choreografie diese Ereignisse zuverlässig.

WAL-basiertes CDC als alternative Relay

Anstatt zu pollen, können Sie das Write-Ahead Log (WAL) von PostgreSQL tailen und Outbox-Einfügungen direkt aus dem Replikationsstrom lesen. Tools wie Debezium tun dies. Die Vorteile sind niedrigere Latenz und kein Lock-Druck auf der Outbox-Tabelle. Die Nachteile sind operative Komplexität, ein dedizierter PostgreSQL-Replikationsslot und ein externer Service, der betrieben und überwacht werden muss.

Für die meisten Teams ist das oben beschriebene Polling-Relay der richtige Ausgangspunkt. WAL-Tailing macht Sinn, wenn Sie hohe Outbox-Einfügungsraten haben (zehntausende pro Sekunde), sub-100ms-Ereignislatenz benötigen oder bereits Debezium für andere Change-Capture-Bedarfe betreiben.

sqlc-Integration

Wenn Sie sqlc für typsicheren Go-Datenbankcode verwenden, passen die Outbox-Abfragen natürlich hinein:

-- name: InsertOutboxEvent :exec
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (@aggregate_type, @aggregate_id, @event_type, @payload);

-- name: FetchOutboxBatch :many
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
  AND attempts < @max_attempts
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED;

-- name: MarkOutboxProcessed :exec
UPDATE outbox_events SET processed_at = NOW() WHERE id = @id;

-- name: IncrementOutboxAttempts :exec
UPDATE outbox_events SET attempts = attempts + 1 WHERE id = @id;

-- name: OutboxPendingCount :one
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

sqlc generiert typsichere Funktionen für jede Abfrage, was String-Interpolationsfehler vermeidet und die Outbox-Abfragelogik mit dem Rest Ihrer Datenbankzugriffsschicht zusammenhält.

Produktions-Checkliste

Verwenden Sie dies vor dem Versand einer Outbox-Implementierung:

Datenbank

  • Outbox-Tabelle hat den partiellen Index auf created_at WHERE processed_at IS NULL
  • attempts-Spalte vorhanden mit einem Standardwert von 0
  • Dead-Letter-Ansicht oder -Abfrage definiert
  • Alte verarbeitete Zeilen werden regelmäßig archiviert oder gelöscht (ein nächtlicher Cleanup-Job reicht aus)

Relay

  • FOR UPDATE SKIP LOCKED in der Polling-Abfrage verwendet
  • Relay läuft innerhalb einer Transaktion (beginne vor der Abfrage, commit nach allen Updates)
  • Batch-Größe ist begrenzt (50-200 Zeilen ist typisch)
  • Relay respektiert Kontext-Absage für graceful Shutdown
  • Fehlgeschlagene Veröffentlichungen erhöhen attempts anstatt die Batch abzubrechen

Idempotenz

  • Veröffentlichte Nachricht enthält die Outbox-id als Deduplizierungsschlüssel
  • Konsumenten sind idempotent oder der Broker bietet Deduplizierung
  • Siehe Idempotenz in verteilten Systemen für Deduplizierungsmuster

Observability

  • outbox_events_pending Gauge wird überwacht und alarmiert
  • Dead-Letter-Anzahl wird alarmiert
  • Relay-Batch-Dauer wird verfolgt
  • Strukturierte Logs enthalten event_id, event_type und aggregate_id

Operationen

  • Manueller Retry-Pfad existiert für Dead-Letter-Zeilen
  • Relay-Neustart-Verhalten wird getestet (veröffentlicht es korrekt neu?)
  • Broker-Ausfall-Verhalten wird getestet (wächst die Outbox und leert sie sich korrekt?)

Abschließende Gedanken

Das Dual-Write-Problem ist leicht als Randfall abzutun, bis es einen Vorfall verursacht. Das transaktionale Outbox-Muster löst es mit Werkzeugen, die Sie bereits haben: Eine PostgreSQL-Transaktion, eine Hintergrund-Goroutine und eine zusätzliche Tabelle. Der Relay ist einfach zu bauen, einfach zu betreiben und einfach nachzuvollziehen.

Die Kosten sind, dass Konsumenten für mindestens-einmalige Lieferung ausgelegt sein müssen. Das ist ein vernünftiger Trade-off. Exactly-Once-Delivery über eine Datenbank und einen Broker ohne verteilte Transaktionen ist in der Praxis nicht erreichbar – und das Pretendieren, es wäre, führt zu Systemen, die bei Fehlerbedingungen still Ereignisse fallen lassen oder doppelt verarbeiten.

Schreiben Sie das Ereignis mit den Daten. Leiten Sie es zuverlässig weiter. Machen Sie Konsumenten idempotent. Das ist das ganze Muster.

Dieser Artikel ist Teil des App Architecture in Production-Clusters.

Quellen

Abonnieren

Neue Beiträge zu Systemen, Infrastruktur und KI-Engineering.