Det transaktionella utboks mönstret i Go med PostgreSQL
Skapa händelsen med datan. Dela aldrig upp dem.
Två skrivningar som bör lyckas tillsammans, kommer med största sannolikhet att misslyckas separat.
Din orderservice sparar ordern i databasen och publicerar sedan ett order.created-händelsemeddelande till en meddelandebroker.
Dessa två operationer körs efter varandra.
Mellan dem kan något gå fel: brokern är nere, nätverket tidsgränsas, processen startas om eller behållaren evikteras. Databasskrivningen lyckades. Publikationen gjorde det inte. Den nedströms placerade service som behöver veta om den nya ordern får aldrig veta. Ingen märkte av det förrän en kund ringde.
Detta är dual-write-problemet, och det är en av de vanligaste källorna till tyst datatapp i distribuerade system. Det transaktionella utlåsingsmönstret (transactional outbox pattern) är den etablerade lösningen.

Dual-write-problemet
Misslyckandemodet är lätt att förstå när man väl ser det:
BEGIN;
INSERT INTO orders ... -- lyckas
COMMIT;
PUBLISH order.created ... -- misslyckas, kraschar eller nås aldrig
Databasen och meddelandebrokern delar inte en transaktionsgräns. Det finns ingen rollback som täcker båda. Varje service som utför save -> publish i sekvens har detta gap. Mönstret dyker upp i många former:
db.Save(order)följd avevents.Publish(OrderCreated{...})- HTTP-handlare som bekräftar en transaktion och sedan anropar en extern webhook
- Worker som bearbetar en post från en kö och skriver resultat till en annan
Resultatet är detsamma i alla fall: den ena sidan lyckas medan den andra misslyckas, och systemet hamnar i ett tillstånd som är osynligt för övervakning eftersom båda individuella operationerna returnerade framgång vid något tillfälle.
En återförsöksloop löser inte detta. Att försöka publicera igen efter databasbekräftelsen fungerar bara om återförsöket självt är pålitligt – vilket kräver exakt den uthållighetsgaranti du inte har.
Vad det transaktionella utlåsingsmönstret gör
Utlåsingsmönstret (outbox pattern) eliminerar gapet genom att helt ta bort den direkta publikationen. Istället för att anropa brokern från din affärslogik, skriver du en händelsepost i en outbox-tabell i samma databastransaktion som affärsdatan. En separat bakgrundsprocess – relayen – läser från utlåsningstabellen och publicerar till brokern.
BEGIN;
INSERT INTO orders ... -- affärsdata
INSERT INTO outbox_events ... -- händelsepost
COMMIT;
-- Relayprocess (separat):
SELECT ... FROM outbox_events FOR UPDATE SKIP LOCKED;
PUBLISH order.created ...
UPDATE outbox_events SET processed_at = NOW() WHERE id = $1;
Båda skrivningarna lyckas eller båda misslyckas. Transaktionsgarantin du redan har från PostgreSQL täcker nu även händelseposten. Relayen kan försöka publicera så många gånger som nödvändigt eftersom händelsen finns i uthållig lagring. Om relayen kraschar under körningen startar den om och försöker igen. Den värsta utkomsten är att händelsen publiceras mer än en gång – vilket hanteras genom att göra konsumenter idempotenta (se Idempotens i distribuerade system).
PostgreSQL-scheman för utlåsningstabellen
Schemat är avsiktligt enkelt:
CREATE TABLE outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
attempts INT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
-- Partiell index: indexerar endast obearbetade rader, håller sig liten när rader markeras som klara
CREATE INDEX idx_outbox_unprocessed
ON outbox_events (created_at)
WHERE processed_at IS NULL;
Den partiella indexeringen på created_at WHERE processed_at IS NULL är viktig. Utan den växer indexet med varje händelse som någonsin skrivits och relayens pollfråga blir långsammare över tid. Med den täcker indexet endast de väntande raderna, vilket i ett stabilt tillstånd är en liten, begränsad mängd oavsett hur många händelser som har publicerats.
Viktiga val av fält:
aggregate_typeochaggregate_idbeskriver vilken entitet händelsen tillhör. Användbart för ordningsgarantier och routing.event_typeär händelsenamnet dina konsumenter förväntar sig.payload JSONBlagrar händelsekroppen. AnvändJSONBsnarare änTEXTså att du kan fråga om den vid behov.attemptshåller reda på hur många gånger relayen har försökt publicera denna rad. Används för återförsöksgränser och hantering av dead letters.processed_atärNULLför väntande rader och sätts när relayen har publicerat framgångsrikt.
Skrivning av affärsdata och utlåsningshändelse i en transaktion
Affärslogiken skriver båda posterna inuti ett enda BeginTx/Commit-anrop. Det finns inget publikationsanrop här – endast databasskrivningar.
type OrderService struct {
db *sql.DB
}
func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
if _, err := tx.ExecContext(ctx, `
INSERT INTO orders (id, customer_id, total, created_at)
VALUES ($1, $2, $3, NOW())
`, order.ID, order.CustomerID, order.Total); err != nil {
return fmt.Errorf("insert order: %w", err)
}
payload, err := json.Marshal(map[string]any{
"order_id": order.ID,
"customer_id": order.CustomerID,
"total": order.Total,
})
if err != nil {
return fmt.Errorf("marshal payload: %w", err)
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO outbox_events
(aggregate_type, aggregate_id, event_type, payload)
VALUES ($1, $2, $3, $4)
`, "order", order.ID, "order.created", payload); err != nil {
return fmt.Errorf("insert outbox event: %w", err)
}
return tx.Commit()
}
Om tx.Commit() misslyckas, varken ordern eller utlåsningposten sparas. Om den lyckas, garanteras att båda finns i databasen. Relayen kan publicera händelsen när som helst efter det – omedelbart, efter en sekund, eller efter att relayen startar om efter en krasch.
Detta är den enda kodändring som krävs i din affärslag. Resten av mönstret finns i relayen.
Go-relayimplementation
Relayen är en bakgrundsarbetare som pollar utlåsningstabellen på en timer. Den hämtar en grupp av obearbetade rader, publicerar var och en och markerar den som klar. Håll den i samma binär som din applikation eller kör den som en separat process – båda fungerar, men samma binär är enklare att drifta.
type OutboxRelay struct {
db *sql.DB
publisher Publisher
logger *slog.Logger
batchSize int
pollInterval time.Duration
maxAttempts int
}
func (r *OutboxRelay) Run(ctx context.Context) error {
ticker := time.NewTicker(r.pollInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := r.processBatch(ctx); err != nil {
r.logger.Error("outbox relay batch failed", "err", err)
}
}
}
}
Relayen respekterar kontextavbrott, vilket gör det enkelt att integrera med graciös avstängning. För en detaljerad genomgång av kontextlivslängd och avbrottsmönster, se Go context.Context Done Right.
FOR UPDATE SKIP LOCKED: det konkurrenta worker-mönstret
Funktionen processBatch använder FOR UPDATE SKIP LOCKED för att säkert hantera parallella relay-workers:
func (r *OutboxRelay) processBatch(ctx context.Context) error {
tx, err := r.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback()
rows, err := tx.QueryContext(ctx, `
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
AND attempts < $1
ORDER BY created_at
LIMIT $2
FOR UPDATE SKIP LOCKED
`, r.maxAttempts, r.batchSize)
if err != nil {
return fmt.Errorf("query outbox: %w", err)
}
defer rows.Close()
type row struct {
id string
aggregateType string
aggregateID string
eventType string
payload json.RawMessage
}
var batch []row
for rows.Next() {
var e row
if err := rows.Scan(
&e.id, &e.aggregateType, &e.aggregateID, &e.eventType, &e.payload,
); err != nil {
return fmt.Errorf("scan row: %w", err)
}
batch = append(batch, e)
}
if err := rows.Err(); err != nil {
return err
}
for _, e := range batch {
if err := r.publisher.Publish(ctx, e.eventType, e.aggregateID, e.payload); err != nil {
r.logger.Error("publish failed", "event_id", e.id, "err", err)
if _, err := tx.ExecContext(ctx,
`UPDATE outbox_events SET attempts = attempts + 1 WHERE id = $1`, e.id,
); err != nil {
r.logger.Error("increment attempts failed", "event_id", e.id, "err", err)
}
continue
}
if _, err := tx.ExecContext(ctx,
`UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`, e.id,
); err != nil {
return fmt.Errorf("mark processed: %w", err)
}
}
return tx.Commit()
}
FOR UPDATE SKIP LOCKED gör två saker. Först, FOR UPDATE låser de valda raderna under transaktionens varaktighet, vilket förhindrar att någon annan transaktion väljer dem. För det andra, SKIP LOCKED betyder att om en rad redan är låst av en annan transaktion, hoppas frågan över den istället för att vänta. Resultatet är att flera relay-workers kan köras parallellt och varje worker plockar upp en icke-överlappande delmängd av rader.
Utan SKIP LOCKED skulle en andra worker blockeras tills den första transaktionen bekräftas innan den ser samma rader – vid vilket tillfälle de redan skulle vara markerade som klara. Med SKIP LOCKED plockar den andra workern omedelbart upp olika rader istället för att vänta, vilket ger dig säker horisontell skalning.
Observera separationen mellan skanning och publicering i koden ovan: alla rader skannas in i en slice innan publiceringsloopen startar. Detta undviker att hålla ett öppet *sql.Rows-cursor över nätverksanrop till brokern, vilket skulle hålla transaktionen öppen längre än nödvändigt.
Idempotens och avdubling
Relayen publicerar minst en gång. Om den publicerar en händelse och sedan kraschar innan den bekräftar uppdateringen av processed_at, kommer den att publicera samma händelse igen vid start. Detta är oundvikligt – exakt-ens-gång-leverans över en databas och en meddelandebroker utan en distribuerad transaktionskoordinator kräver denna avvägning.
Konsumenter måste vara idempotenta. Det enklaste tillvägagångssättet är att spåra bearbetade händelse-ID:n i en processed_events-tabell:
CREATE TABLE processed_events (
event_id UUID PRIMARY KEY,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
func (h *OrderHandler) HandleOrderCreated(ctx context.Context, eventID string, payload []byte) error {
// Avdubling med händelse-ID som nyckel
_, err := h.db.ExecContext(ctx, `
INSERT INTO processed_events (event_id) VALUES ($1)
ON CONFLICT (event_id) DO NOTHING
`, eventID)
if err != nil {
return fmt.Errorf("dedup check: %w", err)
}
// Kontrollera om infogningen faktiskt skedde (1 rad) eller var en no-op (0 rader)
// Ett enklare tillvägagångssätt: använd RETURNING eller kontrollera påverkade rader
// Om 0 rader påverkade, är detta en duplikat – hoppa över den
...
}
I praktiken förlitar sig många team på brokerns egna avdublingshuvuden (såsom Kafkas key-fält för log-kompakterade ämnen, eller RabbitMQ:s message-id-huvud) och behandlar databasnivåavdubling som en fallback. Båda är giltiga lager att tillämpa.
Inkludera utlåsningshändelsens id (en UUID) i det publicerade meddelandet som avdublingsnyckel. Konsumenter kan sedan använda den oavsett vilken avdublingsmekanism de föredrar.
Återförsökspolicy och giftmeddelanden
Kolumnen attempts styr återförsökspolicyn. Relayen hoppar över rader där attempts >= maxAttempts och behandlar dessa rader som dead letters. En separat process eller operatörsvarning hanterar dem.
En enkel dead-letter-vy:
CREATE VIEW outbox_dead_letters AS
SELECT *
FROM outbox_events
WHERE attempts >= 5
AND processed_at IS NULL
ORDER BY created_at;
En bra produktionsåterförsökspolicy:
- Sätt
maxAttemptstill 5-10 beroende på hur kostsamma återförsök är. - Överväg exponentiell backoff: inkludera en
retry_after-kolumn och hoppa över rader därretry_after > NOW(). - Varning vid
COUNT(*) FROM outbox_dead_letterssom överskrider en tröskel. - Erbjud en manuell återförsökspår: en admin-slutpunkt eller skript som återställer
attempts = 0ochretry_after = NULLför specifika rader.
Giftmeddelanden – rader som konsekvent misslyckas på grund av ett bug i konsumenten eller en schemakrock – bör inte blockera friska meddelanden. Eftersom relayen bearbetar en grupp per tick och markerar misslyckanden med en ökning av försök snarare än att ta bort dem från kön, fortsätter friska rader normalt medan förgiftade ackumulerar försök tills de når dead-letter-tröskeln.
Händelseordning och partitionering
Pollfrågan ordnas efter created_at, vilket ger först-in-först-ut-ordning inom en grupp. För de flesta användningsområden räcker detta. När strikt per-entitet-ordning är viktig – till exempel för att säkerställa att order.updated aldrig publiceras före order.created för samma order – behöver du per-aggretat-ordning.
Lägg till aggregate_id i ORDER BY-klausulen och använd den som meddelandenyckel vid publicering till ett partitionerat ämne som Apache Kafka. Kafka ruttar alla meddelanden med samma nyckel till samma partition, och partitioner konsumeras i ordning. Detta ger dig per-aggretat-ordningsgarantier utan global ordning, vilket skulle kräva en enda relayinstans.
ORDER BY aggregate_id, created_at
För broker som inte stöder partitionerad ordning (såsom grundläggande AMQP-köer), är single-instance relay eller applikationsnivåordningskontroller i konsumenten de praktiska alternativen.
Minska pollningsslutlägenhet med LISTEN/NOTIFY
En pollintervall på en sekund betyder genomsnittlig händenselatens på 500 millisekunder. För de flesta arbetsbelastningar är detta bra. För fall där du behöver nära noll latens, låter PostgreSQLs LISTEN/NOTIFY-mekanism relayen vakna omedelbart när en ny utlåsningssrad infogas.
Lägg till en trigger på utlåsningstabellen:
CREATE OR REPLACE FUNCTION notify_outbox_insert() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('outbox_event', NEW.id::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER outbox_insert_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox_insert();
I relayen, lyssna på kanalen och vakna vid aviseringar medan du fortfarande faller tillbaka till periodisk pollning:
func (r *OutboxRelay) Run(ctx context.Context) error {
listener := pq.NewListener(r.dsn, 10*time.Second, time.Minute, nil)
defer listener.Close()
if err := listener.Listen("outbox_event"); err != nil {
return fmt.Errorf("listen: %w", err)
}
ticker := time.NewTicker(5 * time.Second) // fallback poll
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-listener.Notify:
if err := r.processBatch(ctx); err != nil {
r.logger.Error("outbox batch failed (notify)", "err", err)
}
case <-ticker.C:
if err := r.processBatch(ctx); err != nil {
r.logger.Error("outbox batch failed (poll)", "err", err)
}
}
}
}
Fallback-tickern hanterar aviseringar som missades under en relayomstart eller nätverksstörning. Håll fallback-intervallet på några sekunder snarare än millisekunder – dess jobb är återhämtning, inte låg latens.
Observerbarhet: metriker, loggar och varningar
Utlåsningen är infrastruktur. Behandla den som infrastruktur och instrumentera den därefter.
Viktiga metriker:
var (
outboxPublished = prometheus.NewCounter(prometheus.CounterOpts{
Name: "outbox_events_published_total",
Help: "Total outbox events successfully published.",
})
outboxFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "outbox_events_failed_total",
Help: "Total outbox publish failures by event type.",
}, []string{"event_type"})
outboxPending = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "outbox_events_pending",
Help: "Current number of unprocessed outbox events.",
})
outboxBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "outbox_batch_duration_seconds",
Help: "Duration of each outbox processing batch.",
Buckets: prometheus.DefBuckets,
})
)
Gauge-uppdatering: kör en periodisk fråga för att hålla outbox_events_pending korrekt:
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;
Varningströsklar att överväga:
outbox_events_pending > 1000i mer än två minuter: relayen hinner inte ikapp eller är fast.outbox_events_pendingväxer monotont: brokern är nere eller relayen har kraschat.- Dead-letter-räkning är noll: schema- eller konsumentbug behöver undersökas.
outbox_batch_duration_seconds p95 > 5s: databasen är långsam eller batchstorleken är för stor.
Strukturerade loggfält: inkludera event_id, event_type, aggregate_id och attempt i varje logglinje från relayen. Dessa fält låter dig korrelera en misslyckad publikation med den specifika utlåsningssraden och den nedströms placerade konsumentens spår.
Outbox vs. direkt kö vs. saga
Utlåsingsmönstret är inte rätt verktyg för varje koordineringsproblem. Här är jämförelsen:
| Metod | Atomicitet | Komplexitet | När att använda |
|---|---|---|---|
| Direkt publikation | Ingen | Låg | Acceptabelt att ibland tappa händelser |
| Transaktionell outbox | Stark | Medel | Pålitlig händelseleverans från en enda service |
| Saga-mönstret | Eventuell | Hög | Multi-service-transaktioner som sträcker sig över flera databaser |
| Två-fas-kommitt | Stark | Mycket hög | Sällan praktiskt; undviks i de flesta distribuerade system |
Utlåsingsmönstret garanterar att en enda service pålitligt emitterar händelser som reflekterar sina egna tillståndsändringar. Det koordinerar inte tillståndsändringar över flera tjänster – det är vad Saga-mönstret är för. Valet av broker – oavsett om det är RabbitMQ, SQS eller Kafka – är oberoende av utlåsingsmönstret i sig; relayen publicerar till vilken broker ditt system använder.
Om du bygger en saga, är utlåsingsmönstret fortfarande användbart: varje deltagare i sagan skriver sin lokala tillståndsändring och sin saga-händelse i en transaktion med utlåsningen, och sedan läser saga-orchestratorn eller koreografin dessa händelser pålitligt.
WAL-baserad CDC som ett alternativt relay
Istället för pollning kan du taila PostgreSQLs Write-Ahead Log (WAL) och läsa utlåsningssinfogningar direkt från replikeringsströmmen. Verktyg som Debezium gör detta. Fördelarna är lägre latens och inget låstryck på utlåsningstabellen. Nackdelarna är driftkomplexitet, en dedikerad PostgreSQL-replikeringsplats och en extern service att köra och övervaka.
För de flesta team är den pollande relay som beskrivits ovan rätt startpunkt. WAL-tailing gör mening när du har höga utlåsningssinfogningstal (tiotusentals per sekund), behöver sub-100ms händenselatens, eller redan kör Debezium för andra change-capture-behov.
sqlc-integration
Om du använder sqlc för typsäker Go-databaskod, passar utlåsningssfrågorna naturligt:
-- name: InsertOutboxEvent :exec
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (@aggregate_type, @aggregate_id, @event_type, @payload);
-- name: FetchOutboxBatch :many
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
AND attempts < @max_attempts
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED;
-- name: MarkOutboxProcessed :exec
UPDATE outbox_events SET processed_at = NOW() WHERE id = @id;
-- name: IncrementOutboxAttempts :exec
UPDATE outbox_events SET attempts = attempts + 1 WHERE id = @id;
-- name: OutboxPendingCount :one
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;
sqlc genererar typsäkra funktioner för varje fråga, vilket undviker stränginterpoleringsfel och håller utlåsningssfrågelogiken sammanhållen med resten av din databasåtkomstlager.
Produktionschecklista
Använd detta innan du skickar en utlåsningssimplementation:
Databas
- Utlåsningstabellen har det partiella indexet på
created_at WHERE processed_at IS NULL -
attempts-kolumnen finns med standardvärdet 0 - Dead-letter-vy eller fråga definierad
- Gamla bearbetade rader arkiveras eller raderas periodiskt (en nattlig rensningsjobb räcker)
Relay
-
FOR UPDATE SKIP LOCKEDanvänds i pollfrågan - Relayen körs inuti en transaktion (starta före fråga, bekräfta efter alla uppdateringar)
- Batchstorleken är begränsad (50-200 rader är typiskt)
- Relayen respekterar kontextavbrott för graciös avstängning
- Misslyckade publikationer ökar
attemptssnarare än att få batchen att avbrytas
Idempotens
- Publicerat meddelande inkluderar utlåsningss
idsom avdublingsnyckel - Konsumenter är idempotenta eller brokern tillhandahåller avdubling
- Se Idempotens i distribuerade system för avdublingsmönster
Observerbarhet
-
outbox_events_pendinggauge övervakas och varningar aktiveras - Dead-letter-räkning varningar aktiveras
- Relay batchduration spåras
- Strukturerade loggar inkluderar
event_id,event_typeochaggregate_id
Drift
- Manuell återförsökspår finns för dead-letter-rader
- Relay omstartbeteende är testat (publicerar den korrekt igen?)
- Broker outage-beteende är testat (vuxer utlåsningen och dräneras korrekt?)
Avslutande tankar
Dual-write-problemet är lätt att avfärda som en bristningsfall tills det orsakar en incident. Det transaktionella utlåsingsmönstret löser det med verktyg du redan har: en PostgreSQL-transaktion, en bakgrundsgoroutine och en extra tabell. Relayen är enkel att bygga, enkel att drifta och enkel att resonera kring.
Kostnaden är att konsumenter måste designas för minst-ens-gång-leverans. Det är en rimlig avvägning. Exakt-ens-gång-leverans över en databas och en broker utan distribuerade transaktioner är inte uppnåbar i praktiken – och att göra annat leder till system som tyst droppar eller dubbelbearbetar händelser vid misslyckanden.
Skriv händelsen med datan. Relay den pålitligt. Gör konsumenter idempotenta. Det är hela mönstret.
Denna artikel är en del av App Architecture in Production-klustret.