Pattern Transactional Outbox in Go con PostgreSQL
Scrivi l'evento con i dati. Non separarli mai.
Due scritture che dovrebbero riuscire insieme finiscono per fallire separatamente.
Il tuo servizio ordini salva l’ordine nel database, quindi pubblica un evento order.created su un message broker.
Queste due operazioni vengono eseguite una dopo l’altra.
Nel frattempo, qualcosa può andare storto: il broker è down, la rete scade (timeout), il processo si riavvia o il container viene espulso. La scrittura nel database è andata a buon fine. La pubblicazione no. Il servizio a valle che deve sapere del nuovo ordine non viene mai informato. Nessuno se ne è accorto fino a quando un cliente ha chiamato.
Questo è il problema della scrittura doppia (dual-write), ed è una delle fonti più comuni di perdita di dati silente nei sistemi distribuiti. Il pattern dell’outbox transazionale è la soluzione standard.

Il problema della scrittura doppia
Il modalità di fallimento è facile da analizzare una volta che lo si vede:
BEGIN;
INSERT INTO orders ... -- ha successo
COMMIT;
PUBLISH order.created ... -- fallisce, crasha o non viene mai raggiunto
Il database e il message broker non condividono un confine di transazione. Non esiste un rollback che copra entrambi. Ogni servizio che esegue save -> publish in sequenza ha questa lacuna. Il pattern si presenta in molte forme:
db.Save(order)seguito daevents.Publish(OrderCreated{...})- Handler HTTP che committa una transazione e poi chiama un webhook esterno
- Worker che elabora un record da una coda e scrive i risultati in un’altra
L’esito in tutti i casi è lo stesso: un lato ha successo mentre l’altro fallisce, e il sistema finisce in uno stato invisibile al monitoraggio perché entrambe le operazioni individuali hanno restituito successo in qualche momento.
Un ciclo di retry non risolve questo problema. Riprovare la pubblicazione dopo il commit del database funziona solo se il retry stesso è affidabile – il che richiede esattamente la garanzia di durabilità che non hai.
Cosa fa il pattern outbox transazionale
Il pattern outbox elimina il divario rimuovendo del tutto la pubblicazione diretta. Invece di chiamare il broker all’interno della tua logica di business, scrivi un record evento in una tabella outbox nella stessa transazione del database dei dati aziendali. Un processo di background separato – il relay – legge dalla tabella outbox e pubblica sul broker.
BEGIN;
INSERT INTO orders ... -- dati aziendali
INSERT INTO outbox_events ... -- record evento
COMMIT;
-- Processo Relay (separatamente):
SELECT ... FROM outbox_events FOR UPDATE SKIP LOCKED;
PUBLISH order.created ...
UPDATE outbox_events SET processed_at = NOW() WHERE id = $1;
Entrambe le scritture hanno successo o entrambe falliscono. La garanzia di transazione che hai già da PostgreSQL ora copre anche il record evento. Il relay può riprovare a pubblicare quante volte necessario perché l’evento risiede in storage durabile. Se il relay crasha a metà, si riavvia e riprova. Il peggior esito è che l’evento viene pubblicato più di una volta – il che è gestito rendendo i consumatori idempotenti (vedi Idempotenza nei Sistemi Distribuiti).
Schema PostgreSQL per la tabella outbox
Lo schema è intenzionalmente semplice:
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
attempts INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
-- Indice parziale: indossa solo le righe non elaborate, rimane piccolo man mano che le righe vengono segnate come completate
CREATE INDEX idx_outbox_unprocessed
ON outbox_events (created_at)
WHERE processed_at IS NULL;
L’indice parziale su created_at WHERE processed_at IS NULL è importante. Senza di esso, l’indice cresce con ogni evento mai scritto e la query di polling del relay diventa più lenta nel tempo. Con esso, l’indice copre solo le righe in attesa, che in stato stazionario sono un insieme piccolo e limitato, indipendentemente da quanti eventi sono stati pubblicati.
Scelte chiave dei campi:
aggregate_typeeaggregate_iddescrivono a quale entità appartiene l’evento. Utili per garanzie di ordinamento e instradamento.event_typeè il nome dell’evento che i consumatori si aspettano.payload JSONBmemorizza il corpo dell’evento. UsaJSONBpiuttosto cheTEXTcosì puoi interrogarlo se necessario.attemptstiene traccia di quante volte il relay ha provato a pubblicare questa riga. Usato per limiti di retry e gestione delle dead-letter.processed_atèNULLper le righe in attesa e viene impostato quando il relay pubblica con successo.
Scrittura di dati aziendali e evento outbox in una transazione
La logica di business scrive entrambi i record all’interno di una singola chiamata BeginTx / Commit. Non c’è chiamata di publish qui – solo scritture nel database.
type OrderService struct {
db *sql.DB
}
func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("inizio tx: %w", err)
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx, `
INSERT INTO orders (id, customer_id, total, created_at)
VALUES ($1, $2, $3, NOW())
`, order.ID, order.CustomerID, order.Total); err != nil {
return fmt.Errorf("inserimento ordine: %w", err)
}
payload, err := json.Marshal(map[string]any{
"order_id": order.ID,
"customer_id": order.CustomerID,
"total": order.Total,
})
if err != nil {
return fmt.Errorf("serializzazione payload: %w", err)
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO outbox_events
(aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4)
`, "order", order.ID, "order.created", payload); err != nil {
return fmt.Errorf("inserimento evento outbox: %w", err)
}
return tx.Commit()
}
Se tx.Commit() fallisce, né la riga dell’ordine né quella dell’outbox vengono persistite. Se ha successo, entrambi sono garantiti nel database. Il relay può pubblicare l’evento in qualsiasi momento dopo questo – immediatamente, in un secondo, o dopo che il relay si riavvia following un crash.
Questo è l’unico cambiamento di codice richiesto nel tuo livello di business. Il resto del pattern vive nel relay.
Implementazione del relay in Go
Il relay è un worker di background che effettua polling sulla tabella outbox su un timer. Recupera un batch di righe non elaborate, pubblica ciascuna e la segna come completata. Tienila nello stesso binario della tua applicazione o eseguila come processo separato – entrambi funzionano, ma lo stesso binario è più semplice da gestire.
type OutboxRelay struct {
db *sql.DB
publisher Publisher
logger *slog.Logger
batchSize int
pollInterval time.Duration
maxAttempts int
}
func (r *OutboxRelay) Run(ctx context.Context) error {
ticker := time.NewTicker(r.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := r.processBatch(ctx); err != nil {
r.logger.Error("batch relay outbox fallito", "err", err)
}
}
}
}
Il relay rispetta l’annullamento del contesto, il che lo rende facile da integrare con lo shutdown graceful. Per un trattamento dettagliato della durata del contesto e dei pattern di cancellazione, vedi Go context.Context Fatto Bene.
FOR UPDATE SKIP LOCKED: il pattern worker concorrente
La funzione processBatch usa FOR UPDATE SKIP LOCKED per gestire in modo sicuro i worker relay concorrenti:
func (r *OutboxRelay) processBatch(ctx context.Context) error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("inizio tx: %w", err)
}
defer tx.Rollback()
rows, err := tx.QueryContext(ctx, `
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
AND attempts < $1
ORDER BY created_at
LIMIT $2
FOR UPDATE SKIP LOCKED
`, r.maxAttempts, r.batchSize)
if err != nil {
return fmt.Errorf("query outbox: %w", err)
}
defer rows.Close()
type row struct {
id string
aggregateType string
aggregateID string
eventType string
payload json.RawMessage
}
var batch []row
for rows.Next() {
var e row
if err := rows.Scan(
&e.id, &e.aggregateType, &e.aggregateID, &e.eventType, &e.payload,
); err != nil {
return fmt.Errorf("lettura riga: %w", err)
}
batch = append(batch, e)
}
if err := rows.Err(); err != nil {
return err
}
for _, e := range batch {
if err := r.publisher.Publish(ctx, e.eventType, e.aggregateID, e.payload); err != nil {
r.logger.Error("pubblicazione fallita", "event_id", e.id, "err", err)
if _, err := tx.ExecContext(ctx,
`UPDATE outbox_events SET attempts = attempts + 1 WHERE id = $1`, e.id,
); err != nil {
r.logger.Error("incremento tentativi fallito", "event_id", e.id, "err", err)
}
continue
}
if _, err := tx.ExecContext(ctx,
`UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`, e.id,
); err != nil {
return fmt.Errorf("segna elaborato: %w", err)
}
}
return tx.Commit()
}
FOR UPDATE SKIP LOCKED fa due cose. Primo, FOR UPDATE blocca le righe selezionate per la durata della transazione, impedendo ad altre transazioni di selezionarle. Secondo, SKIP LOCKED significa che se una riga è già bloccata da un’altra transazione, la query la salta invece di aspettare. Il risultato è che più worker relay possono girare in parallelo e ciascuno prenderà un sottoinsieme non sovrapposto di righe.
Senza SKIP LOCKED, un secondo worker bloccherebbe fino al commit della prima transazione prima di vedere le stesse righe – a quel punto sarebbero già segnate come completate. Con SKIP LOCKED, il secondo worker prende immediatamente righe diverse invece di aspettare, offrendoti un scaling orizzontale sicuro.
Nota la separazione scan-then-publish nel codice sopra: tutte le righe vengono scansionate in una slice prima che inizi il ciclo di publish. Questo evita di tenere aperto un cursore *sql.Rows durante le chiamate di rete al broker, il quale manterrebbe la transazione aperta più a lungo del necessario.
Idempotenza e deduplicazione
Il relay pubblica almeno una volta. Se pubblica un evento e poi crasha prima di committare l’aggiornamento processed_at, pubblicherà lo stesso evento di nuovo al riavvio. Questo è inevitabile – la consegna esattamente-una-volta tra un database e un message broker senza un coordinatore di transazioni distribuite richiede questo compromesso.
I consumatori devono essere idempotenti. L’approccio più semplice è tenere traccia degli ID eventi elaborati in una tabella processed_events:
CREATE TABLE processed_events (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
func (h *OrderHandler) HandleOrderCreated(ctx context.Context, eventID string, payload []byte) error {
// Deduplica usando l'ID evento come chiave naturale
_, err := h.db.ExecContext(ctx, `
INSERT INTO processed_events (event_id) VALUES ($1)
ON CONFLICT (event_id) DO NOTHING
`, eventID)
if err != nil {
return fmt.Errorf("controllo dedup: %w", err)
}
// Controlla se l'inserimento è effettivamente avvenuto (1 riga) o è stato un no-op (0 righe)
// Un approccio più semplice: usa RETURNING o controlla le righe interessate
// Se 0 righe interessate, è un duplicato -- saltalo
...
}
In pratica, molti team si affidano agli header di deduplicazione del broker stesso (come il campo key di Kafka per topic log-compacted, o l’header message-id di RabbitMQ) e trattano la deduplicazione a livello di database come fallback. Entrambi sono livelli validi da applicare.
Includi l’id dell’evento outbox (un UUID) nel messaggio pubblicato come chiave di deduplicazione. I consumatori possono quindi usarlo indipendentemente dal meccanismo di deduplicazione preferito.
Policy di retry e messaggi velenosi
La colonna attempts guida la policy di retry. Il relay salta le righe dove attempts >= maxAttempts e tratta quelle righe come dead letter. Un processo separato o un allarme operativo le gestisce.
Una vista semplice per le dead letter:
CREATE VIEW outbox_dead_letters AS
SELECT *
FROM outbox_events
WHERE attempts >= 5
AND processed_at IS NULL
ORDER BY created_at;
Una buona policy di retry per produzione:
- Imposta
maxAttemptsa 5-10 in base a quanto sono costosi i retry. - Considera il backoff esponenziale: includi una colonna
retry_aftere salta le righe doveretry_after > NOW(). - Allarma se
COUNT(*) FROM outbox_dead_letterssupera una soglia. - Fornisci un percorso di retry manuale: un endpoint admin o script che resetta
attempts = 0eretry_after = NULLper righe specifiche.
I messaggi velenosi – righe che falliscono costantemente a causa di un bug nel consumer o di un mismatch dello schema – non dovrebbero bloccare i messaggi sani. Poiché il relay elabora un batch per tick e segna i fallimenti con un incremento del tentativo piuttosto che rimuoverli dalla coda, le righe sane procedono normalmente mentre quelle velenose accumulano tentativi fino a raggiungere la soglia delle dead letter.
Ordinamento degli eventi e partizionamento
La query di polling ordina per created_at, il che dà un ordinamento first-in-first-out all’interno di un batch. Per la maggior parte dei casi d’uso è sufficiente. Quando l’ordinamento stretto per entità è importante – ad esempio, assicurando che order.updated non venga mai pubblicato prima di order.created per lo stesso ordine – hai bisogno di ordinamento per aggregato.
Aggiungi aggregate_id alla clausola ORDER BY e usalo come chiave del messaggio quando pubblichi su un topic partizionato come Apache Kafka. Kafka instrada tutti i messaggi con la stessa chiave alla stessa partizione, e le partizioni sono consumate in ordine. Questo ti dà garanzie di ordinamento per aggregato senza ordinamento globale, il quale richiederebbe un’unica istanza relay.
ORDER BY aggregate_id, created_at
Per broker che non supportano l’ordinamento partizionato (come code AMQP di base), un relay a singola istanza o controlli di ordinamento a livello di applicazione nel consumer sono le alternative pratiche.
Riduci la latenza di polling con LISTEN/NOTIFY
Un intervallo di polling di un secondo significa una latenza media dell’evento di 500 millisecondi. Per la maggior parte dei carichi di lavoro va bene. Per casi in cui hai bisogno di latenza quasi zero, il meccanismo LISTEN/NOTIFY di PostgreSQL permette al relay di svegliarsi immediatamente quando viene inserito un nuovo record outbox.
Aggiungi un trigger alla tabella outbox:
CREATE OR REPLACE FUNCTION notify_outbox_insert() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('outbox_event', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER outbox_insert_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox_insert();
Nel relay, ascolta sul canale e svegliati alle notifiche mantenendo comunque il fallback al polling periodico:
func (r *OutboxRelay) Run(ctx context.Context) error {
listener := pq.NewListener(r.dsn, 10*time.Second, time.Minute, nil)
defer listener.Close()
if err := listener.Listen("outbox_event"); err != nil {
return fmt.Errorf("ascolto: %w", err)
}
ticker := time.NewTicker(5 * time.Second) // polling fallback
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-listener.Notify:
if err := r.processBatch(ctx); err != nil {
r.logger.Error("batch outbox fallito (notify)", "err", err)
}
case <-ticker.C:
if err := r.processBatch(ctx); err != nil {
r.logger.Error("batch outbox fallito (poll)", "err", err)
}
}
}
}
Il ticker fallback gestisce eventuali notifiche perse durante un riavvio del relay o un’interruzione di rete. Mantieni l’intervallo fallback di alcuni secondi piuttosto che millisecondi – il suo compito è il recupero, non la bassa latenza.
Osservabilità: metriche, log e allarmi
L’outbox è infrastruttura. Trattala come infrastruttura e strumentala di conseguenza.
Metriche chiave:
var (
outboxPublished = prometheus.NewCounter(prometheus.CounterOpts{
Name: "outbox_events_published_total",
Help: "Totale eventi outbox pubblicati con successo.",
})
outboxFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "outbox_events_failed_total",
Help: "Totale fallimenti di pubblicazione outbox per tipo di evento.",
}, []string{"event_type"})
outboxPending = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "outbox_events_pending",
Help: "Numero corrente di eventi outbox non elaborati.",
})
outboxBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "outbox_batch_duration_seconds",
Help: "Durata di ogni batch di elaborazione outbox.",
Buckets: prometheus.DefBuckets,
})
)
Aggiornamento Gauge: esegui una query periodica per mantenere outbox_events_pending accurato:
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;
Soglie di allarme da considerare:
outbox_events_pending > 1000per più di due minuti: il relay sta rimanendo indietro o è bloccato.outbox_events_pendingin crescita monotona: il broker è down o il relay ha crashato.- Conteggio dead-letter non nullo: bug di schema o consumer da investigare.
outbox_batch_duration_seconds p95 > 5s: il database è lento o la dimensione del batch è troppo grande.
Campi log strutturati: includi event_id, event_type, aggregate_id e attempt in ogni riga di log dal relay. Questi campi ti permettono di correlare una pubblicazione fallita con la specifica riga outbox e il trace del consumer a valle.
Outbox vs coda diretta vs saga
Il pattern outbox non è lo strumento giusto per ogni problema di coordinamento. Ecco il confronto:
| Approccio | Atomicità | Complessità | Quando usare |
|---|---|---|---|
| Pubblicazione diretta | Nessuna | Bassa | Accettabile perdere occasionalmente eventi |
| Outbox transazionale | Forte | Media | Consegna affidabile di eventi da un singolo servizio |
| Pattern Saga | Eventuale | Alta | Transazioni multi-servizio che attraversano più database |
| Due-phase commit | Forte | Molto alta | Raramente pratico; evitato nella maggior parte dei sistemi distribuiti |
Il pattern outbox garantisce che un singolo servizio emetta in modo affidabile eventi che riflettono i propri cambiamenti di stato. Non coordina i cambiamenti di stato tra più servizi – quello è il compito del Pattern Saga. La scelta del broker – sia RabbitMQ, SQS, o Kafka – è indipendente dal pattern outbox stesso; il relay pubblica su qualsiasi broker usi il tuo sistema.
Se stai costruendo una saga, il pattern outbox è comunque utile: ogni partecipante alla saga scrive il suo cambiamento di stato locale e il suo evento saga in una transazione usando l’outbox, poi l’orchestratore o la coreografia della saga legge quegli eventi in modo affidabile.
WAL-based CDC come relay alternativo
Invece di effettuare polling, puoi fare tailing sul Write-Ahead Log (WAL) di PostgreSQL e leggere gli inserimenti outbox direttamente dallo stream di replicazione. Strumenti come Debezium fanno questo. I vantaggi sono latenza più bassa e nessuna pressione di lock sulla tabella outbox. Gli svantaggi sono complessità operativa, uno slot di replicazione PostgreSQL dedicato e un servizio esterno da eseguire e monitorare.
Per la maggior parte dei team, il relay di polling descritto sopra è il punto di partenza giusto. Il tailing WAL ha senso quando hai tassi di inserimento outbox alti (decine di migliaia al secondo), hai bisogno di latenza evento sub-100ms, o stai già eseguendo Debezium per altri bisogni di change-capture.
Integrazione sqlc
Se usi sqlc per codice database Go type-safe, le query outbox si adattano naturalmente:
-- name: InsertOutboxEvent :exec
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (@aggregate_type, @aggregate_id, @event_type, @payload);
-- name: FetchOutboxBatch :many
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
AND attempts < @max_attempts
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED;
-- name: MarkOutboxProcessed :exec
UPDATE outbox_events SET processed_at = NOW() WHERE id = @id;
-- name: IncrementOutboxAttempts :exec
UPDATE outbox_events SET attempts = attempts + 1 WHERE id = @id;
-- name: OutboxPendingCount :one
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;
sqlc genera funzioni type-safe per ogni query, il che evita errori di interpolazione stringa e mantiene la logica delle query outbox co-localizzata con il resto del tuo livello di accesso al database.
Checklist per la produzione
Usa questa prima di rilasciare un’implementazione outbox:
Database
- La tabella outbox ha l’indice parziale su
created_at WHERE processed_at IS NULL - Colonna
attemptspresente con default 0 - Vista o query per le dead letter definita
- Le righe elaborate vecchie vengono archiviate o eliminate periodicamente (un job di pulizia notturna è sufficiente)
Relay
-
FOR UPDATE SKIP LOCKEDusato nella query di polling - Il relay gira dentro una transazione (begin prima della query, commit dopo tutti gli update)
- La dimensione del batch è limitata (50-200 righe è tipico)
- Il relay rispetta l’annullamento del contesto per shutdown graceful
- Le pubblicazioni fallite incrementano
attemptsinvece di causare l’abort del batch
Idempotenza
- Il messaggio pubblicato include l’
idoutbox come chiave di deduplicazione - I consumatori sono idempotenti o il broker fornisce deduplicazione
- Vedi Idempotenza nei Sistemi Distribuiti per pattern di deduplicazione
Osservabilità
- Il gauge
outbox_events_pendingè monitorato e allarmato - Il conteggio delle dead letter è allarmato
- La durata del batch relay è tracciata
- I log strutturati includono
event_id,event_typeeaggregate_id
Operazioni
- Esiste un percorso di retry manuale per le righe dead letter
- Il comportamento di riavvio del relay è testato (ripubblica correttamente?)
- Il comportamento in caso di outage del broker è testato (l’outbox cresce e si svuota correttamente?)
Pensieri finali
Il problema della scrittura doppia è facile da liquidare come un caso limite finché non causa un incidente. Il pattern outbox transazionale lo risolve con strumenti che già hai: una transazione PostgreSQL, una goroutine di background e una tabella in più. Il relay è semplice da costruire, semplice da gestire e semplice da analizzare.
Il costo è che i consumatori devono essere progettati per la consegna almeno-una-volta. È un compromesso ragionevole. La consegna esattamente-una-volta tra un database e un broker senza transazioni distribuite non è realizzabile nella pratica – e fingere il contrario porta a sistemi che silenziosamente scartano o processano due volte eventi in condizioni di fallimento.
Scrivi l’evento con i dati. Inoltrailo in modo affidabile. Rendi i consumatori idempotenti. Questo è l’intero pattern.
Questo articolo fa parte del cluster Architettura App in Produzione.