Det transaktionella utboks mönstret i Go med PostgreSQL

Skapa händelsen med datan. Dela aldrig upp dem.

Sidinnehåll

Två skrivningar som bör lyckas tillsammans, kommer med största sannolikhet att misslyckas separat. Din orderservice sparar ordern i databasen och publicerar sedan ett order.created-händelsemeddelande till en meddelandebroker.

Dessa två operationer körs efter varandra.

Mellan dem kan något gå fel: brokern är nere, nätverket tidsgränsas, processen startas om eller behållaren evikteras. Databasskrivningen lyckades. Publikationen gjorde det inte. Den nedströms placerade service som behöver veta om den nya ordern får aldrig veta. Ingen märkte av det förrän en kund ringde.

Detta är dual-write-problemet, och det är en av de vanligaste källorna till tyst datatapp i distribuerade system. Det transaktionella utlåsingsmönstret (transactional outbox pattern) är den etablerade lösningen.

Transactional outbox pattern – event and data written together

Dual-write-problemet

Misslyckandemodet är lätt att förstå när man väl ser det:

BEGIN;
  INSERT INTO orders ...   -- lyckas
COMMIT;

PUBLISH order.created ...  -- misslyckas, kraschar eller nås aldrig

Databasen och meddelandebrokern delar inte en transaktionsgräns. Det finns ingen rollback som täcker båda. Varje service som utför save -> publish i sekvens har detta gap. Mönstret dyker upp i många former:

  • db.Save(order) följd av events.Publish(OrderCreated{...})
  • HTTP-handlare som bekräftar en transaktion och sedan anropar en extern webhook
  • Worker som bearbetar en post från en kö och skriver resultat till en annan

Resultatet är detsamma i alla fall: den ena sidan lyckas medan den andra misslyckas, och systemet hamnar i ett tillstånd som är osynligt för övervakning eftersom båda individuella operationerna returnerade framgång vid något tillfälle.

En återförsöksloop löser inte detta. Att försöka publicera igen efter databasbekräftelsen fungerar bara om återförsöket självt är pålitligt – vilket kräver exakt den uthållighetsgaranti du inte har.

Vad det transaktionella utlåsingsmönstret gör

Utlåsingsmönstret (outbox pattern) eliminerar gapet genom att helt ta bort den direkta publikationen. Istället för att anropa brokern från din affärslogik, skriver du en händelsepost i en outbox-tabell i samma databastransaktion som affärsdatan. En separat bakgrundsprocess – relayen – läser från utlåsningstabellen och publicerar till brokern.

BEGIN;
  INSERT INTO orders ...         -- affärsdata
  INSERT INTO outbox_events ...  -- händelsepost
COMMIT;

-- Relayprocess (separat):
SELECT ... FROM outbox_events FOR UPDATE SKIP LOCKED;
PUBLISH order.created ...
UPDATE outbox_events SET processed_at = NOW() WHERE id = $1;

Båda skrivningarna lyckas eller båda misslyckas. Transaktionsgarantin du redan har från PostgreSQL täcker nu även händelseposten. Relayen kan försöka publicera så många gånger som nödvändigt eftersom händelsen finns i uthållig lagring. Om relayen kraschar under körningen startar den om och försöker igen. Den värsta utkomsten är att händelsen publiceras mer än en gång – vilket hanteras genom att göra konsumenter idempotenta (se Idempotens i distribuerade system).

PostgreSQL-scheman för utlåsningstabellen

Schemat är avsiktligt enkelt:

CREATE TABLE outbox_events (
    id             UUID         PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id   VARCHAR(100) NOT NULL,
    event_type     VARCHAR(100) NOT NULL,
    payload        JSONB        NOT NULL,
    attempts       INT          NOT NULL DEFAULT 0,
    created_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    processed_at   TIMESTAMPTZ
);

-- Partiell index: indexerar endast obearbetade rader, håller sig liten när rader markeras som klara
CREATE INDEX idx_outbox_unprocessed
    ON outbox_events (created_at)
    WHERE processed_at IS NULL;

Den partiella indexeringen på created_at WHERE processed_at IS NULL är viktig. Utan den växer indexet med varje händelse som någonsin skrivits och relayens pollfråga blir långsammare över tid. Med den täcker indexet endast de väntande raderna, vilket i ett stabilt tillstånd är en liten, begränsad mängd oavsett hur många händelser som har publicerats.

Viktiga val av fält:

  • aggregate_type och aggregate_id beskriver vilken entitet händelsen tillhör. Användbart för ordningsgarantier och routing.
  • event_type är händelsenamnet dina konsumenter förväntar sig.
  • payload JSONB lagrar händelsekroppen. Använd JSONB snarare än TEXT så att du kan fråga om den vid behov.
  • attempts håller reda på hur många gånger relayen har försökt publicera denna rad. Används för återförsöksgränser och hantering av dead letters.
  • processed_at är NULL för väntande rader och sätts när relayen har publicerat framgångsrikt.

Skrivning av affärsdata och utlåsningshändelse i en transaktion

Affärslogiken skriver båda posterna inuti ett enda BeginTx/Commit-anrop. Det finns inget publikationsanrop här – endast databasskrivningar.

type OrderService struct {
    db *sql.DB
}

func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO orders (id, customer_id, total, created_at)
        VALUES ($1, $2, $3, NOW())
    `, order.ID, order.CustomerID, order.Total); err != nil {
        return fmt.Errorf("insert order: %w", err)
    }

    payload, err := json.Marshal(map[string]any{
        "order_id":    order.ID,
        "customer_id": order.CustomerID,
        "total":       order.Total,
    })
    if err != nil {
        return fmt.Errorf("marshal payload: %w", err)
    }

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO outbox_events
            (aggregate_type, aggregate_id, event_type, payload)
        VALUES ($1, $2, $3, $4)
    `, "order", order.ID, "order.created", payload); err != nil {
        return fmt.Errorf("insert outbox event: %w", err)
    }

    return tx.Commit()
}

Om tx.Commit() misslyckas, varken ordern eller utlåsningposten sparas. Om den lyckas, garanteras att båda finns i databasen. Relayen kan publicera händelsen när som helst efter det – omedelbart, efter en sekund, eller efter att relayen startar om efter en krasch.

Detta är den enda kodändring som krävs i din affärslag. Resten av mönstret finns i relayen.

Go-relayimplementation

Relayen är en bakgrundsarbetare som pollar utlåsningstabellen på en timer. Den hämtar en grupp av obearbetade rader, publicerar var och en och markerar den som klar. Håll den i samma binär som din applikation eller kör den som en separat process – båda fungerar, men samma binär är enklare att drifta.

type OutboxRelay struct {
    db          *sql.DB
    publisher   Publisher
    logger      *slog.Logger
    batchSize   int
    pollInterval time.Duration
    maxAttempts  int
}

func (r *OutboxRelay) Run(ctx context.Context) error {
    ticker := time.NewTicker(r.pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox relay batch failed", "err", err)
            }
        }
    }
}

Relayen respekterar kontextavbrott, vilket gör det enkelt att integrera med graciös avstängning. För en detaljerad genomgång av kontextlivslängd och avbrottsmönster, se Go context.Context Done Right.

FOR UPDATE SKIP LOCKED: det konkurrenta worker-mönstret

Funktionen processBatch använder FOR UPDATE SKIP LOCKED för att säkert hantera parallella relay-workers:

func (r *OutboxRelay) processBatch(ctx context.Context) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    rows, err := tx.QueryContext(ctx, `
        SELECT id, aggregate_type, aggregate_id, event_type, payload
        FROM outbox_events
        WHERE processed_at IS NULL
          AND attempts < $1
        ORDER BY created_at
        LIMIT $2
        FOR UPDATE SKIP LOCKED
    `, r.maxAttempts, r.batchSize)
    if err != nil {
        return fmt.Errorf("query outbox: %w", err)
    }
    defer rows.Close()

    type row struct {
        id            string
        aggregateType string
        aggregateID   string
        eventType     string
        payload       json.RawMessage
    }

    var batch []row
    for rows.Next() {
        var e row
        if err := rows.Scan(
            &e.id, &e.aggregateType, &e.aggregateID, &e.eventType, &e.payload,
        ); err != nil {
            return fmt.Errorf("scan row: %w", err)
        }
        batch = append(batch, e)
    }
    if err := rows.Err(); err != nil {
        return err
    }

    for _, e := range batch {
        if err := r.publisher.Publish(ctx, e.eventType, e.aggregateID, e.payload); err != nil {
            r.logger.Error("publish failed", "event_id", e.id, "err", err)
            if _, err := tx.ExecContext(ctx,
                `UPDATE outbox_events SET attempts = attempts + 1 WHERE id = $1`, e.id,
            ); err != nil {
                r.logger.Error("increment attempts failed", "event_id", e.id, "err", err)
            }
            continue
        }

        if _, err := tx.ExecContext(ctx,
            `UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`, e.id,
        ); err != nil {
            return fmt.Errorf("mark processed: %w", err)
        }
    }

    return tx.Commit()
}

FOR UPDATE SKIP LOCKED gör två saker. Först, FOR UPDATE låser de valda raderna under transaktionens varaktighet, vilket förhindrar att någon annan transaktion väljer dem. För det andra, SKIP LOCKED betyder att om en rad redan är låst av en annan transaktion, hoppas frågan över den istället för att vänta. Resultatet är att flera relay-workers kan köras parallellt och varje worker plockar upp en icke-överlappande delmängd av rader.

Utan SKIP LOCKED skulle en andra worker blockeras tills den första transaktionen bekräftas innan den ser samma rader – vid vilket tillfälle de redan skulle vara markerade som klara. Med SKIP LOCKED plockar den andra workern omedelbart upp olika rader istället för att vänta, vilket ger dig säker horisontell skalning.

Observera separationen mellan skanning och publicering i koden ovan: alla rader skannas in i en slice innan publiceringsloopen startar. Detta undviker att hålla ett öppet *sql.Rows-cursor över nätverksanrop till brokern, vilket skulle hålla transaktionen öppen längre än nödvändigt.

Idempotens och avdubling

Relayen publicerar minst en gång. Om den publicerar en händelse och sedan kraschar innan den bekräftar uppdateringen av processed_at, kommer den att publicera samma händelse igen vid start. Detta är oundvikligt – exakt-ens-gång-leverans över en databas och en meddelandebroker utan en distribuerad transaktionskoordinator kräver denna avvägning.

Konsumenter måste vara idempotenta. Det enklaste tillvägagångssättet är att spåra bearbetade händelse-ID:n i en processed_events-tabell:

CREATE TABLE processed_events (
    event_id   UUID PRIMARY KEY,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
func (h *OrderHandler) HandleOrderCreated(ctx context.Context, eventID string, payload []byte) error {
    // Avdubling med händelse-ID som nyckel
    _, err := h.db.ExecContext(ctx, `
        INSERT INTO processed_events (event_id) VALUES ($1)
        ON CONFLICT (event_id) DO NOTHING
    `, eventID)
    if err != nil {
        return fmt.Errorf("dedup check: %w", err)
    }

    // Kontrollera om infogningen faktiskt skedde (1 rad) eller var en no-op (0 rader)
    // Ett enklare tillvägagångssätt: använd RETURNING eller kontrollera påverkade rader
    // Om 0 rader påverkade, är detta en duplikat – hoppa över den
    ...
}

I praktiken förlitar sig många team på brokerns egna avdublingshuvuden (såsom Kafkas key-fält för log-kompakterade ämnen, eller RabbitMQ:s message-id-huvud) och behandlar databasnivåavdubling som en fallback. Båda är giltiga lager att tillämpa.

Inkludera utlåsningshändelsens id (en UUID) i det publicerade meddelandet som avdublingsnyckel. Konsumenter kan sedan använda den oavsett vilken avdublingsmekanism de föredrar.

Återförsökspolicy och giftmeddelanden

Kolumnen attempts styr återförsökspolicyn. Relayen hoppar över rader där attempts >= maxAttempts och behandlar dessa rader som dead letters. En separat process eller operatörsvarning hanterar dem.

En enkel dead-letter-vy:

CREATE VIEW outbox_dead_letters AS
SELECT *
FROM outbox_events
WHERE attempts >= 5
  AND processed_at IS NULL
ORDER BY created_at;

En bra produktionsåterförsökspolicy:

  • Sätt maxAttempts till 5-10 beroende på hur kostsamma återförsök är.
  • Överväg exponentiell backoff: inkludera en retry_after-kolumn och hoppa över rader där retry_after > NOW().
  • Varning vid COUNT(*) FROM outbox_dead_letters som överskrider en tröskel.
  • Erbjud en manuell återförsökspår: en admin-slutpunkt eller skript som återställer attempts = 0 och retry_after = NULL för specifika rader.

Giftmeddelanden – rader som konsekvent misslyckas på grund av ett bug i konsumenten eller en schemakrock – bör inte blockera friska meddelanden. Eftersom relayen bearbetar en grupp per tick och markerar misslyckanden med en ökning av försök snarare än att ta bort dem från kön, fortsätter friska rader normalt medan förgiftade ackumulerar försök tills de når dead-letter-tröskeln.

Händelseordning och partitionering

Pollfrågan ordnas efter created_at, vilket ger först-in-först-ut-ordning inom en grupp. För de flesta användningsområden räcker detta. När strikt per-entitet-ordning är viktig – till exempel för att säkerställa att order.updated aldrig publiceras före order.created för samma order – behöver du per-aggretat-ordning.

Lägg till aggregate_id i ORDER BY-klausulen och använd den som meddelandenyckel vid publicering till ett partitionerat ämne som Apache Kafka. Kafka ruttar alla meddelanden med samma nyckel till samma partition, och partitioner konsumeras i ordning. Detta ger dig per-aggretat-ordningsgarantier utan global ordning, vilket skulle kräva en enda relayinstans.

ORDER BY aggregate_id, created_at

För broker som inte stöder partitionerad ordning (såsom grundläggande AMQP-köer), är single-instance relay eller applikationsnivåordningskontroller i konsumenten de praktiska alternativen.

Minska pollningsslutlägenhet med LISTEN/NOTIFY

En pollintervall på en sekund betyder genomsnittlig händenselatens på 500 millisekunder. För de flesta arbetsbelastningar är detta bra. För fall där du behöver nära noll latens, låter PostgreSQLs LISTEN/NOTIFY-mekanism relayen vakna omedelbart när en ny utlåsningssrad infogas.

Lägg till en trigger på utlåsningstabellen:

CREATE OR REPLACE FUNCTION notify_outbox_insert() RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify('outbox_event', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_insert_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox_insert();

I relayen, lyssna på kanalen och vakna vid aviseringar medan du fortfarande faller tillbaka till periodisk pollning:

func (r *OutboxRelay) Run(ctx context.Context) error {
    listener := pq.NewListener(r.dsn, 10*time.Second, time.Minute, nil)
    defer listener.Close()

    if err := listener.Listen("outbox_event"); err != nil {
        return fmt.Errorf("listen: %w", err)
    }

    ticker := time.NewTicker(5 * time.Second) // fallback poll
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-listener.Notify:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (notify)", "err", err)
            }
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (poll)", "err", err)
            }
        }
    }
}

Fallback-tickern hanterar aviseringar som missades under en relayomstart eller nätverksstörning. Håll fallback-intervallet på några sekunder snarare än millisekunder – dess jobb är återhämtning, inte låg latens.

Observerbarhet: metriker, loggar och varningar

Utlåsningen är infrastruktur. Behandla den som infrastruktur och instrumentera den därefter.

Viktiga metriker:

var (
    outboxPublished = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "outbox_events_published_total",
        Help: "Total outbox events successfully published.",
    })
    outboxFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
        Name: "outbox_events_failed_total",
        Help: "Total outbox publish failures by event type.",
    }, []string{"event_type"})
    outboxPending = prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "outbox_events_pending",
        Help: "Current number of unprocessed outbox events.",
    })
    outboxBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name:    "outbox_batch_duration_seconds",
        Help:    "Duration of each outbox processing batch.",
        Buckets: prometheus.DefBuckets,
    })
)

Gauge-uppdatering: kör en periodisk fråga för att hålla outbox_events_pending korrekt:

SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

Varningströsklar att överväga:

  • outbox_events_pending > 1000 i mer än två minuter: relayen hinner inte ikapp eller är fast.
  • outbox_events_pending växer monotont: brokern är nere eller relayen har kraschat.
  • Dead-letter-räkning är noll: schema- eller konsumentbug behöver undersökas.
  • outbox_batch_duration_seconds p95 > 5s: databasen är långsam eller batchstorleken är för stor.

Strukturerade loggfält: inkludera event_id, event_type, aggregate_id och attempt i varje logglinje från relayen. Dessa fält låter dig korrelera en misslyckad publikation med den specifika utlåsningssraden och den nedströms placerade konsumentens spår.

Outbox vs. direkt kö vs. saga

Utlåsingsmönstret är inte rätt verktyg för varje koordineringsproblem. Här är jämförelsen:

Metod Atomicitet Komplexitet När att använda
Direkt publikation Ingen Låg Acceptabelt att ibland tappa händelser
Transaktionell outbox Stark Medel Pålitlig händelseleverans från en enda service
Saga-mönstret Eventuell Hög Multi-service-transaktioner som sträcker sig över flera databaser
Två-fas-kommitt Stark Mycket hög Sällan praktiskt; undviks i de flesta distribuerade system

Utlåsingsmönstret garanterar att en enda service pålitligt emitterar händelser som reflekterar sina egna tillståndsändringar. Det koordinerar inte tillståndsändringar över flera tjänster – det är vad Saga-mönstret är för. Valet av broker – oavsett om det är RabbitMQ, SQS eller Kafka – är oberoende av utlåsingsmönstret i sig; relayen publicerar till vilken broker ditt system använder.

Om du bygger en saga, är utlåsingsmönstret fortfarande användbart: varje deltagare i sagan skriver sin lokala tillståndsändring och sin saga-händelse i en transaktion med utlåsningen, och sedan läser saga-orchestratorn eller koreografin dessa händelser pålitligt.

WAL-baserad CDC som ett alternativt relay

Istället för pollning kan du taila PostgreSQLs Write-Ahead Log (WAL) och läsa utlåsningssinfogningar direkt från replikeringsströmmen. Verktyg som Debezium gör detta. Fördelarna är lägre latens och inget låstryck på utlåsningstabellen. Nackdelarna är driftkomplexitet, en dedikerad PostgreSQL-replikeringsplats och en extern service att köra och övervaka.

För de flesta team är den pollande relay som beskrivits ovan rätt startpunkt. WAL-tailing gör mening när du har höga utlåsningssinfogningstal (tiotusentals per sekund), behöver sub-100ms händenselatens, eller redan kör Debezium för andra change-capture-behov.

sqlc-integration

Om du använder sqlc för typsäker Go-databaskod, passar utlåsningssfrågorna naturligt:

-- name: InsertOutboxEvent :exec
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (@aggregate_type, @aggregate_id, @event_type, @payload);

-- name: FetchOutboxBatch :many
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
  AND attempts < @max_attempts
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED;

-- name: MarkOutboxProcessed :exec
UPDATE outbox_events SET processed_at = NOW() WHERE id = @id;

-- name: IncrementOutboxAttempts :exec
UPDATE outbox_events SET attempts = attempts + 1 WHERE id = @id;

-- name: OutboxPendingCount :one
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

sqlc genererar typsäkra funktioner för varje fråga, vilket undviker stränginterpoleringsfel och håller utlåsningssfrågelogiken sammanhållen med resten av din databasåtkomstlager.

Produktionschecklista

Använd detta innan du skickar en utlåsningssimplementation:

Databas

  • Utlåsningstabellen har det partiella indexet på created_at WHERE processed_at IS NULL
  • attempts-kolumnen finns med standardvärdet 0
  • Dead-letter-vy eller fråga definierad
  • Gamla bearbetade rader arkiveras eller raderas periodiskt (en nattlig rensningsjobb räcker)

Relay

  • FOR UPDATE SKIP LOCKED används i pollfrågan
  • Relayen körs inuti en transaktion (starta före fråga, bekräfta efter alla uppdateringar)
  • Batchstorleken är begränsad (50-200 rader är typiskt)
  • Relayen respekterar kontextavbrott för graciös avstängning
  • Misslyckade publikationer ökar attempts snarare än att få batchen att avbrytas

Idempotens

  • Publicerat meddelande inkluderar utlåsningssid som avdublingsnyckel
  • Konsumenter är idempotenta eller brokern tillhandahåller avdubling
  • Se Idempotens i distribuerade system för avdublingsmönster

Observerbarhet

  • outbox_events_pending gauge övervakas och varningar aktiveras
  • Dead-letter-räkning varningar aktiveras
  • Relay batchduration spåras
  • Strukturerade loggar inkluderar event_id, event_type och aggregate_id

Drift

  • Manuell återförsökspår finns för dead-letter-rader
  • Relay omstartbeteende är testat (publicerar den korrekt igen?)
  • Broker outage-beteende är testat (vuxer utlåsningen och dräneras korrekt?)

Avslutande tankar

Dual-write-problemet är lätt att avfärda som en bristningsfall tills det orsakar en incident. Det transaktionella utlåsingsmönstret löser det med verktyg du redan har: en PostgreSQL-transaktion, en bakgrundsgoroutine och en extra tabell. Relayen är enkel att bygga, enkel att drifta och enkel att resonera kring.

Kostnaden är att konsumenter måste designas för minst-ens-gång-leverans. Det är en rimlig avvägning. Exakt-ens-gång-leverans över en databas och en broker utan distribuerade transaktioner är inte uppnåbar i praktiken – och att göra annat leder till system som tyst droppar eller dubbelbearbetar händelser vid misslyckanden.

Skriv händelsen med datan. Relay den pålitligt. Gör konsumenter idempotenta. Det är hela mönstret.

Denna artikel är en del av App Architecture in Production-klustret.

Källor

Prenumerera

Få nya inlägg om system, infrastruktur och AI-ingenjörskonst.