Pattern Transactional Outbox in Go con PostgreSQL

Scrivi l'evento con i dati. Non separarli mai.

Indice

Due scritture che dovrebbero riuscire insieme finiscono per fallire separatamente. Il tuo servizio ordini salva l’ordine nel database, quindi pubblica un evento order.created su un message broker.

Queste due operazioni vengono eseguite una dopo l’altra.

Nel frattempo, qualcosa può andare storto: il broker è down, la rete scade (timeout), il processo si riavvia o il container viene espulso. La scrittura nel database è andata a buon fine. La pubblicazione no. Il servizio a valle che deve sapere del nuovo ordine non viene mai informato. Nessuno se ne è accorto fino a quando un cliente ha chiamato.

Questo è il problema della scrittura doppia (dual-write), ed è una delle fonti più comuni di perdita di dati silente nei sistemi distribuiti. Il pattern dell’outbox transazionale è la soluzione standard.

Pattern outbox transazionale – evento e dati scritti insieme

Il problema della scrittura doppia

Il modalità di fallimento è facile da analizzare una volta che lo si vede:

BEGIN;
  INSERT INTO orders ...   -- ha successo
COMMIT;

PUBLISH order.created ...  -- fallisce, crasha o non viene mai raggiunto

Il database e il message broker non condividono un confine di transazione. Non esiste un rollback che copra entrambi. Ogni servizio che esegue save -> publish in sequenza ha questa lacuna. Il pattern si presenta in molte forme:

  • db.Save(order) seguito da events.Publish(OrderCreated{...})
  • Handler HTTP che committa una transazione e poi chiama un webhook esterno
  • Worker che elabora un record da una coda e scrive i risultati in un’altra

L’esito in tutti i casi è lo stesso: un lato ha successo mentre l’altro fallisce, e il sistema finisce in uno stato invisibile al monitoraggio perché entrambe le operazioni individuali hanno restituito successo in qualche momento.

Un ciclo di retry non risolve questo problema. Riprovare la pubblicazione dopo il commit del database funziona solo se il retry stesso è affidabile – il che richiede esattamente la garanzia di durabilità che non hai.

Cosa fa il pattern outbox transazionale

Il pattern outbox elimina il divario rimuovendo del tutto la pubblicazione diretta. Invece di chiamare il broker all’interno della tua logica di business, scrivi un record evento in una tabella outbox nella stessa transazione del database dei dati aziendali. Un processo di background separato – il relay – legge dalla tabella outbox e pubblica sul broker.

BEGIN;
  INSERT INTO orders ...         -- dati aziendali
  INSERT INTO outbox_events ...  -- record evento
COMMIT;

-- Processo Relay (separatamente):
SELECT ... FROM outbox_events FOR UPDATE SKIP LOCKED;
PUBLISH order.created ...
UPDATE outbox_events SET processed_at = NOW() WHERE id = $1;

Entrambe le scritture hanno successo o entrambe falliscono. La garanzia di transazione che hai già da PostgreSQL ora copre anche il record evento. Il relay può riprovare a pubblicare quante volte necessario perché l’evento risiede in storage durabile. Se il relay crasha a metà, si riavvia e riprova. Il peggior esito è che l’evento viene pubblicato più di una volta – il che è gestito rendendo i consumatori idempotenti (vedi Idempotenza nei Sistemi Distribuiti).

Schema PostgreSQL per la tabella outbox

Lo schema è intenzionalmente semplice:

CREATE TABLE outbox_events (
    id             UUID         PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id   VARCHAR(100) NOT NULL,
    event_type     VARCHAR(100) NOT NULL,
    payload        JSONB        NOT NULL,
    attempts       INT          NOT NULL DEFAULT 0,
    created_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    processed_at   TIMESTAMPTZ
);

-- Indice parziale: indossa solo le righe non elaborate, rimane piccolo man mano che le righe vengono segnate come completate
CREATE INDEX idx_outbox_unprocessed
    ON outbox_events (created_at)
    WHERE processed_at IS NULL;

L’indice parziale su created_at WHERE processed_at IS NULL è importante. Senza di esso, l’indice cresce con ogni evento mai scritto e la query di polling del relay diventa più lenta nel tempo. Con esso, l’indice copre solo le righe in attesa, che in stato stazionario sono un insieme piccolo e limitato, indipendentemente da quanti eventi sono stati pubblicati.

Scelte chiave dei campi:

  • aggregate_type e aggregate_id descrivono a quale entità appartiene l’evento. Utili per garanzie di ordinamento e instradamento.
  • event_type è il nome dell’evento che i consumatori si aspettano.
  • payload JSONB memorizza il corpo dell’evento. Usa JSONB piuttosto che TEXT così puoi interrogarlo se necessario.
  • attempts tiene traccia di quante volte il relay ha provato a pubblicare questa riga. Usato per limiti di retry e gestione delle dead-letter.
  • processed_at è NULL per le righe in attesa e viene impostato quando il relay pubblica con successo.

Scrittura di dati aziendali e evento outbox in una transazione

La logica di business scrive entrambi i record all’interno di una singola chiamata BeginTx / Commit. Non c’è chiamata di publish qui – solo scritture nel database.

type OrderService struct {
    db *sql.DB
}

func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("inizio tx: %w", err)
    }
    defer tx.Rollback()

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO orders (id, customer_id, total, created_at)
        VALUES ($1, $2, $3, NOW())
    `, order.ID, order.CustomerID, order.Total); err != nil {
        return fmt.Errorf("inserimento ordine: %w", err)
    }

    payload, err := json.Marshal(map[string]any{
        "order_id":    order.ID,
        "customer_id": order.CustomerID,
        "total":       order.Total,
    })
    if err != nil {
        return fmt.Errorf("serializzazione payload: %w", err)
    }

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO outbox_events
            (aggregate_type, aggregate_id, event_type, payload)
        VALUES ($1, $2, $3, $4)
    `, "order", order.ID, "order.created", payload); err != nil {
        return fmt.Errorf("inserimento evento outbox: %w", err)
    }

    return tx.Commit()
}

Se tx.Commit() fallisce, né la riga dell’ordine né quella dell’outbox vengono persistite. Se ha successo, entrambi sono garantiti nel database. Il relay può pubblicare l’evento in qualsiasi momento dopo questo – immediatamente, in un secondo, o dopo che il relay si riavvia following un crash.

Questo è l’unico cambiamento di codice richiesto nel tuo livello di business. Il resto del pattern vive nel relay.

Implementazione del relay in Go

Il relay è un worker di background che effettua polling sulla tabella outbox su un timer. Recupera un batch di righe non elaborate, pubblica ciascuna e la segna come completata. Tienila nello stesso binario della tua applicazione o eseguila come processo separato – entrambi funzionano, ma lo stesso binario è più semplice da gestire.

type OutboxRelay struct {
    db          *sql.DB
    publisher   Publisher
    logger      *slog.Logger
    batchSize   int
    pollInterval time.Duration
    maxAttempts  int
}

func (r *OutboxRelay) Run(ctx context.Context) error {
    ticker := time.NewTicker(r.pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("batch relay outbox fallito", "err", err)
            }
        }
    }
}

Il relay rispetta l’annullamento del contesto, il che lo rende facile da integrare con lo shutdown graceful. Per un trattamento dettagliato della durata del contesto e dei pattern di cancellazione, vedi Go context.Context Fatto Bene.

FOR UPDATE SKIP LOCKED: il pattern worker concorrente

La funzione processBatch usa FOR UPDATE SKIP LOCKED per gestire in modo sicuro i worker relay concorrenti:

func (r *OutboxRelay) processBatch(ctx context.Context) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("inizio tx: %w", err)
    }
    defer tx.Rollback()

    rows, err := tx.QueryContext(ctx, `
        SELECT id, aggregate_type, aggregate_id, event_type, payload
        FROM outbox_events
        WHERE processed_at IS NULL
          AND attempts < $1
        ORDER BY created_at
        LIMIT $2
        FOR UPDATE SKIP LOCKED
    `, r.maxAttempts, r.batchSize)
    if err != nil {
        return fmt.Errorf("query outbox: %w", err)
    }
    defer rows.Close()

    type row struct {
        id            string
        aggregateType string
        aggregateID   string
        eventType     string
        payload       json.RawMessage
    }

    var batch []row
    for rows.Next() {
        var e row
        if err := rows.Scan(
            &e.id, &e.aggregateType, &e.aggregateID, &e.eventType, &e.payload,
        ); err != nil {
            return fmt.Errorf("lettura riga: %w", err)
        }
        batch = append(batch, e)
    }
    if err := rows.Err(); err != nil {
        return err
    }

    for _, e := range batch {
        if err := r.publisher.Publish(ctx, e.eventType, e.aggregateID, e.payload); err != nil {
            r.logger.Error("pubblicazione fallita", "event_id", e.id, "err", err)
            if _, err := tx.ExecContext(ctx,
                `UPDATE outbox_events SET attempts = attempts + 1 WHERE id = $1`, e.id,
            ); err != nil {
                r.logger.Error("incremento tentativi fallito", "event_id", e.id, "err", err)
            }
            continue
        }

        if _, err := tx.ExecContext(ctx,
            `UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`, e.id,
        ); err != nil {
            return fmt.Errorf("segna elaborato: %w", err)
        }
    }

    return tx.Commit()
}

FOR UPDATE SKIP LOCKED fa due cose. Primo, FOR UPDATE blocca le righe selezionate per la durata della transazione, impedendo ad altre transazioni di selezionarle. Secondo, SKIP LOCKED significa che se una riga è già bloccata da un’altra transazione, la query la salta invece di aspettare. Il risultato è che più worker relay possono girare in parallelo e ciascuno prenderà un sottoinsieme non sovrapposto di righe.

Senza SKIP LOCKED, un secondo worker bloccherebbe fino al commit della prima transazione prima di vedere le stesse righe – a quel punto sarebbero già segnate come completate. Con SKIP LOCKED, il secondo worker prende immediatamente righe diverse invece di aspettare, offrendoti un scaling orizzontale sicuro.

Nota la separazione scan-then-publish nel codice sopra: tutte le righe vengono scansionate in una slice prima che inizi il ciclo di publish. Questo evita di tenere aperto un cursore *sql.Rows durante le chiamate di rete al broker, il quale manterrebbe la transazione aperta più a lungo del necessario.

Idempotenza e deduplicazione

Il relay pubblica almeno una volta. Se pubblica un evento e poi crasha prima di committare l’aggiornamento processed_at, pubblicherà lo stesso evento di nuovo al riavvio. Questo è inevitabile – la consegna esattamente-una-volta tra un database e un message broker senza un coordinatore di transazioni distribuite richiede questo compromesso.

I consumatori devono essere idempotenti. L’approccio più semplice è tenere traccia degli ID eventi elaborati in una tabella processed_events:

CREATE TABLE processed_events (
    event_id   UUID PRIMARY KEY,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
func (h *OrderHandler) HandleOrderCreated(ctx context.Context, eventID string, payload []byte) error {
    // Deduplica usando l'ID evento come chiave naturale
    _, err := h.db.ExecContext(ctx, `
        INSERT INTO processed_events (event_id) VALUES ($1)
        ON CONFLICT (event_id) DO NOTHING
    `, eventID)
    if err != nil {
        return fmt.Errorf("controllo dedup: %w", err)
    }

    // Controlla se l'inserimento è effettivamente avvenuto (1 riga) o è stato un no-op (0 righe)
    // Un approccio più semplice: usa RETURNING o controlla le righe interessate
    // Se 0 righe interessate, è un duplicato -- saltalo
    ...
}

In pratica, molti team si affidano agli header di deduplicazione del broker stesso (come il campo key di Kafka per topic log-compacted, o l’header message-id di RabbitMQ) e trattano la deduplicazione a livello di database come fallback. Entrambi sono livelli validi da applicare.

Includi l’id dell’evento outbox (un UUID) nel messaggio pubblicato come chiave di deduplicazione. I consumatori possono quindi usarlo indipendentemente dal meccanismo di deduplicazione preferito.

Policy di retry e messaggi velenosi

La colonna attempts guida la policy di retry. Il relay salta le righe dove attempts >= maxAttempts e tratta quelle righe come dead letter. Un processo separato o un allarme operativo le gestisce.

Una vista semplice per le dead letter:

CREATE VIEW outbox_dead_letters AS
SELECT *
FROM outbox_events
WHERE attempts >= 5
  AND processed_at IS NULL
ORDER BY created_at;

Una buona policy di retry per produzione:

  • Imposta maxAttempts a 5-10 in base a quanto sono costosi i retry.
  • Considera il backoff esponenziale: includi una colonna retry_after e salta le righe dove retry_after > NOW().
  • Allarma se COUNT(*) FROM outbox_dead_letters supera una soglia.
  • Fornisci un percorso di retry manuale: un endpoint admin o script che resetta attempts = 0 e retry_after = NULL per righe specifiche.

I messaggi velenosi – righe che falliscono costantemente a causa di un bug nel consumer o di un mismatch dello schema – non dovrebbero bloccare i messaggi sani. Poiché il relay elabora un batch per tick e segna i fallimenti con un incremento del tentativo piuttosto che rimuoverli dalla coda, le righe sane procedono normalmente mentre quelle velenose accumulano tentativi fino a raggiungere la soglia delle dead letter.

Ordinamento degli eventi e partizionamento

La query di polling ordina per created_at, il che dà un ordinamento first-in-first-out all’interno di un batch. Per la maggior parte dei casi d’uso è sufficiente. Quando l’ordinamento stretto per entità è importante – ad esempio, assicurando che order.updated non venga mai pubblicato prima di order.created per lo stesso ordine – hai bisogno di ordinamento per aggregato.

Aggiungi aggregate_id alla clausola ORDER BY e usalo come chiave del messaggio quando pubblichi su un topic partizionato come Apache Kafka. Kafka instrada tutti i messaggi con la stessa chiave alla stessa partizione, e le partizioni sono consumate in ordine. Questo ti dà garanzie di ordinamento per aggregato senza ordinamento globale, il quale richiederebbe un’unica istanza relay.

ORDER BY aggregate_id, created_at

Per broker che non supportano l’ordinamento partizionato (come code AMQP di base), un relay a singola istanza o controlli di ordinamento a livello di applicazione nel consumer sono le alternative pratiche.

Riduci la latenza di polling con LISTEN/NOTIFY

Un intervallo di polling di un secondo significa una latenza media dell’evento di 500 millisecondi. Per la maggior parte dei carichi di lavoro va bene. Per casi in cui hai bisogno di latenza quasi zero, il meccanismo LISTEN/NOTIFY di PostgreSQL permette al relay di svegliarsi immediatamente quando viene inserito un nuovo record outbox.

Aggiungi un trigger alla tabella outbox:

CREATE OR REPLACE FUNCTION notify_outbox_insert() RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify('outbox_event', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_insert_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox_insert();

Nel relay, ascolta sul canale e svegliati alle notifiche mantenendo comunque il fallback al polling periodico:

func (r *OutboxRelay) Run(ctx context.Context) error {
    listener := pq.NewListener(r.dsn, 10*time.Second, time.Minute, nil)
    defer listener.Close()

    if err := listener.Listen("outbox_event"); err != nil {
        return fmt.Errorf("ascolto: %w", err)
    }

    ticker := time.NewTicker(5 * time.Second) // polling fallback
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-listener.Notify:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("batch outbox fallito (notify)", "err", err)
            }
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("batch outbox fallito (poll)", "err", err)
            }
        }
    }
}

Il ticker fallback gestisce eventuali notifiche perse durante un riavvio del relay o un’interruzione di rete. Mantieni l’intervallo fallback di alcuni secondi piuttosto che millisecondi – il suo compito è il recupero, non la bassa latenza.

Osservabilità: metriche, log e allarmi

L’outbox è infrastruttura. Trattala come infrastruttura e strumentala di conseguenza.

Metriche chiave:

var (
    outboxPublished = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "outbox_events_published_total",
        Help: "Totale eventi outbox pubblicati con successo.",
    })
    outboxFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
        Name: "outbox_events_failed_total",
        Help: "Totale fallimenti di pubblicazione outbox per tipo di evento.",
    }, []string{"event_type"})
    outboxPending = prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "outbox_events_pending",
        Help: "Numero corrente di eventi outbox non elaborati.",
    })
    outboxBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name:    "outbox_batch_duration_seconds",
        Help:    "Durata di ogni batch di elaborazione outbox.",
        Buckets: prometheus.DefBuckets,
    })
)

Aggiornamento Gauge: esegui una query periodica per mantenere outbox_events_pending accurato:

SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

Soglie di allarme da considerare:

  • outbox_events_pending > 1000 per più di due minuti: il relay sta rimanendo indietro o è bloccato.
  • outbox_events_pending in crescita monotona: il broker è down o il relay ha crashato.
  • Conteggio dead-letter non nullo: bug di schema o consumer da investigare.
  • outbox_batch_duration_seconds p95 > 5s: il database è lento o la dimensione del batch è troppo grande.

Campi log strutturati: includi event_id, event_type, aggregate_id e attempt in ogni riga di log dal relay. Questi campi ti permettono di correlare una pubblicazione fallita con la specifica riga outbox e il trace del consumer a valle.

Outbox vs coda diretta vs saga

Il pattern outbox non è lo strumento giusto per ogni problema di coordinamento. Ecco il confronto:

Approccio Atomicità Complessità Quando usare
Pubblicazione diretta Nessuna Bassa Accettabile perdere occasionalmente eventi
Outbox transazionale Forte Media Consegna affidabile di eventi da un singolo servizio
Pattern Saga Eventuale Alta Transazioni multi-servizio che attraversano più database
Due-phase commit Forte Molto alta Raramente pratico; evitato nella maggior parte dei sistemi distribuiti

Il pattern outbox garantisce che un singolo servizio emetta in modo affidabile eventi che riflettono i propri cambiamenti di stato. Non coordina i cambiamenti di stato tra più servizi – quello è il compito del Pattern Saga. La scelta del broker – sia RabbitMQ, SQS, o Kafka – è indipendente dal pattern outbox stesso; il relay pubblica su qualsiasi broker usi il tuo sistema.

Se stai costruendo una saga, il pattern outbox è comunque utile: ogni partecipante alla saga scrive il suo cambiamento di stato locale e il suo evento saga in una transazione usando l’outbox, poi l’orchestratore o la coreografia della saga legge quegli eventi in modo affidabile.

WAL-based CDC come relay alternativo

Invece di effettuare polling, puoi fare tailing sul Write-Ahead Log (WAL) di PostgreSQL e leggere gli inserimenti outbox direttamente dallo stream di replicazione. Strumenti come Debezium fanno questo. I vantaggi sono latenza più bassa e nessuna pressione di lock sulla tabella outbox. Gli svantaggi sono complessità operativa, uno slot di replicazione PostgreSQL dedicato e un servizio esterno da eseguire e monitorare.

Per la maggior parte dei team, il relay di polling descritto sopra è il punto di partenza giusto. Il tailing WAL ha senso quando hai tassi di inserimento outbox alti (decine di migliaia al secondo), hai bisogno di latenza evento sub-100ms, o stai già eseguendo Debezium per altri bisogni di change-capture.

Integrazione sqlc

Se usi sqlc per codice database Go type-safe, le query outbox si adattano naturalmente:

-- name: InsertOutboxEvent :exec
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (@aggregate_type, @aggregate_id, @event_type, @payload);

-- name: FetchOutboxBatch :many
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
  AND attempts < @max_attempts
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED;

-- name: MarkOutboxProcessed :exec
UPDATE outbox_events SET processed_at = NOW() WHERE id = @id;

-- name: IncrementOutboxAttempts :exec
UPDATE outbox_events SET attempts = attempts + 1 WHERE id = @id;

-- name: OutboxPendingCount :one
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

sqlc genera funzioni type-safe per ogni query, il che evita errori di interpolazione stringa e mantiene la logica delle query outbox co-localizzata con il resto del tuo livello di accesso al database.

Checklist per la produzione

Usa questa prima di rilasciare un’implementazione outbox:

Database

  • La tabella outbox ha l’indice parziale su created_at WHERE processed_at IS NULL
  • Colonna attempts presente con default 0
  • Vista o query per le dead letter definita
  • Le righe elaborate vecchie vengono archiviate o eliminate periodicamente (un job di pulizia notturna è sufficiente)

Relay

  • FOR UPDATE SKIP LOCKED usato nella query di polling
  • Il relay gira dentro una transazione (begin prima della query, commit dopo tutti gli update)
  • La dimensione del batch è limitata (50-200 righe è tipico)
  • Il relay rispetta l’annullamento del contesto per shutdown graceful
  • Le pubblicazioni fallite incrementano attempts invece di causare l’abort del batch

Idempotenza

  • Il messaggio pubblicato include l’id outbox come chiave di deduplicazione
  • I consumatori sono idempotenti o il broker fornisce deduplicazione
  • Vedi Idempotenza nei Sistemi Distribuiti per pattern di deduplicazione

Osservabilità

  • Il gauge outbox_events_pending è monitorato e allarmato
  • Il conteggio delle dead letter è allarmato
  • La durata del batch relay è tracciata
  • I log strutturati includono event_id, event_type e aggregate_id

Operazioni

  • Esiste un percorso di retry manuale per le righe dead letter
  • Il comportamento di riavvio del relay è testato (ripubblica correttamente?)
  • Il comportamento in caso di outage del broker è testato (l’outbox cresce e si svuota correttamente?)

Pensieri finali

Il problema della scrittura doppia è facile da liquidare come un caso limite finché non causa un incidente. Il pattern outbox transazionale lo risolve con strumenti che già hai: una transazione PostgreSQL, una goroutine di background e una tabella in più. Il relay è semplice da costruire, semplice da gestire e semplice da analizzare.

Il costo è che i consumatori devono essere progettati per la consegna almeno-una-volta. È un compromesso ragionevole. La consegna esattamente-una-volta tra un database e un broker senza transazioni distribuite non è realizzabile nella pratica – e fingere il contrario porta a sistemi che silenziosamente scartano o processano due volte eventi in condizioni di fallimento.

Scrivi l’evento con i dati. Inoltrailo in modo affidabile. Rendi i consumatori idempotenti. Questo è l’intero pattern.

Questo articolo fa parte del cluster Architettura App in Produzione.

Fonti

Iscriviti

Ricevi nuovi articoli su sistemi, infrastruttura e ingegneria AI.