Wzorzec Outbox transakcyjny w Go z PostgreSQL

Zapisz zdarzenie z danymi. Nigdy ich nie dziel.

Page content

Dwie operacje zapisu, które powinny się udać razem, w końcu przestaną działać osobno. Twoja usługa zamówień zapisuje zamówienie do bazy danych, a następnie publikuje zdarzenie order.created w brokerze wiadomości.

Te dwie operacje wykonują się jedna po drugiej.

Pomiędzy nimi coś może pójść nie tak: broker jest niedostępny, połączenie sieciowe przekracza limit czasu, proces jest restartowany lub kontener jest usuwany. Zapis do bazy danych się udał. Publikacja nie. Usługa dółstrumieniowa, która musi wiedzieć o nowym zamówieniu, nigdy się o tym nie dowie. Nikt nie zauważył problemu, dopóki klient nie zadzwonił.

Jest to problem podwójnego zapisu (dual-write problem) i stanowi jedną z najczęstszych przyczyn cichej utraty danych w systemach rozproszonych. Standardowym rozwiązaniem jest wzorzec transakcyjnej skrzynki wychodzącej (transactional outbox pattern).

Wzorzec transakcyjnej skrzynki wychodzącej – zdarzenie i dane zapisywane razem

Problem podwójnego zapisu

Tryb awarii jest łatwy do zrozumienia, gdy go się zobaczy:

BEGIN;
  INSERT INTO orders ...   -- się udaje
COMMIT;

PUBLISH order.created ...  -- się nie udaje, crashuje lub nigdy nie zostaje wykonana

Baza danych i broker wiadomości nie współdzielą granicy transakcji. Nie ma rollbacku, który obejmowałby obie operacje. Każda usługa, która wykonuje sekwencję save -> publish, ma tę lukę. Wzorzec ten pojawia się w wielu formach:

  • db.Save(order) po którym następuje events.Publish(OrderCreated{...})
  • Obsługa HTTP, która zatwierdza transakcję, a następnie wywołuje zewnętrzny webhook
  • Worker, który przetwarza rekord z jednej kolejki i zapisuje wyniki do innej

Wynik we wszystkich przypadkach jest ten sam: jedna strona się udaje, a druga nie, a system kończy w stanie niewidocznym dla monitoringu, ponieważ obie pojedyncze operacje zwróciły sukces w pewnym momencie.

Pętla ponawiania (retry loop) tego nie naprawia. Ponawianie publikacji po zatwierdzeniu transakcji w bazie danych działa tylko wtedy, gdy samo ponawianie jest niezawodne – co wymaga dokładnie tej gwarancji trwałości, której nie posiadasz.

Co robi wzorzec transakcyjnej skrzynki wychodzącej

Wzorzec outbox eliminuje lukę, całkowicie usuwając bezpośrednią publikację. Zamiast wołać brokera z wnętrza logiki biznesowej, zapisujesz rekord zdarzenia do tabeli outbox w tej samej transakcji bazy danych co dane biznesowe. Odrębny proces w tle – relay (przekazywacz) – odczytuje z tabeli outbox i publikuje do brokera.

BEGIN;
  INSERT INTO orders ...         -- dane biznesowe
  INSERT INTO outbox_events ...  -- rekord zdarzenia
COMMIT;

-- Proces Relay (osobno):
SELECT ... FROM outbox_events FOR UPDATE SKIP LOCKED;
PUBLISH order.created ...
UPDATE outbox_events SET processed_at = NOW() WHERE id = $1;

Obie operacje zapisu muszą się udać lub obie nie muszą. Gwarancja transakcji, którą już masz z PostgreSQL, teraz obejmuje również rekord zdarzenia. Relay może ponawiać publikację tak wiele razy, jak jest potrzebne, ponieważ zdarzenie znajduje się w trwałej pamięci. Jeśli relay zawiedzie w trakcie działania, uruchomi się ponownie i spróbuje jeszcze raz. Najgorszym wynikiem jest opublikowanie zdarzenia więcej niż raz – co jest obsługiwane przez sprawienie, aby konsumenty były idempotentne (zobacz Idempotentność w systemach rozproszonych).

Schemat PostgreSQL dla tabeli outbox

Schemat jest celowo prosty:

CREATE TABLE outbox_events (
    id             UUID         PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id   VARCHAR(100) NOT NULL,
    event_type     VARCHAR(100) NOT NULL,
    payload        JSONB        NOT NULL,
    attempts       INT          NOT NULL DEFAULT 0,
    created_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    processed_at   TIMESTAMPTZ
);

-- Indeks cząstkowy: indeksuje tylko nieprzetworzone wiersze, pozostaje mały, gdy wiersze są oznaczone jako zakończone
CREATE INDEX idx_outbox_unprocessed
    ON outbox_events (created_at)
    WHERE processed_at IS NULL;

Cząstkowy indeks na created_at WHERE processed_at IS NULL jest ważny. Bez niego indeks rośnie z każdym kiedykolwiek zapisanym zdarzeniem, a zapytanie pollingowe relayu staje się coraz wolniejsze z czasem. Dzięki niemu indeks obejmuje tylko oczekujące wiersze, co w stanie ustalmionym jest małym, ograniczonym zbiorem, niezależnie od liczby opublikowanych zdarzeń.

Kluczowe wybory pól:

  • aggregate_type i aggregate_id opisują, do którego encji należy zdarzenie. Przydatne do gwarancji kolejności i routingu.
  • event_type to nazwa zdarzenia, której oczekują konsumenci.
  • payload JSONB przechowuje treść zdarzenia. Użyj JSONB zamiast TEXT, aby móc go zapytywać w razie potrzeby.
  • attempts śledzi, ile razy relay próbował opublikować ten wiersz. Używane do limitów ponawiania i obsługi wiadomości martwych (dead-letter).
  • processed_at ma wartość NULL dla wierszy oczekujących i jest ustawiane, gdy relay pomyślnie publikuje.

Zapis danych biznesowych i zdarzenia outbox w jednej transakcji

Logika biznesowa zapisuje oba rekordy wewnątrz jednego wywołania BeginTx / Commit. Nie ma tutaj wywołania publikacji – tylko zapisy do bazy danych.

type OrderService struct {
    db *sql.DB
}

func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO orders (id, customer_id, total, created_at)
        VALUES ($1, $2, $3, NOW())
    `, order.ID, order.CustomerID, order.Total); err != nil {
        return fmt.Errorf("insert order: %w", err)
    }

    payload, err := json.Marshal(map[string]any{
        "order_id":    order.ID,
        "customer_id": order.CustomerID,
        "total":       order.Total,
    })
    if err != nil {
        return fmt.Errorf("marshal payload: %w", err)
    }

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO outbox_events
            (aggregate_type, aggregate_id, event_type, payload)
        VALUES ($1, $2, $3, $4)
    `, "order", order.ID, "order.created", payload); err != nil {
        return fmt.Errorf("insert outbox event: %w", err)
    }

    return tx.Commit()
}

Jeśli tx.Commit() się nie powiedzie, ani wiersz zamówienia, ani wiersz outbox nie są utrwalane. Jeśli się powiedzie, oba są gwarantowane w bazie danych. Relay może opublikować zdarzenie w dowolnym momencie po tym – natychmiast, po jednej sekundzie lub po restartu relayu po awarii.

Jest to jedyna zmiana kodu wymagana w warstwie biznesowej. Reszta wzorca znajduje się w relayu.

Implementacja relayu w Go

Relay to worker tła, który odpytuje tabelę outbox na timerze. Pobiera partię nieprzetworzonych wierszy, publikuje każdą z nich i oznacza jako zakończoną. Trzymaj go w tym samym binarnym pliku co Twoja aplikacja lub uruchamiaj jako osobny proces – oba sposoby działają, ale ten sam binarny plik jest prostszy w obsłudze.

type OutboxRelay struct {
    db          *sql.DB
    publisher   Publisher
    logger      *slog.Logger
    batchSize   int
    pollInterval time.Duration
    maxAttempts  int
}

func (r *OutboxRelay) Run(ctx context.Context) error {
    ticker := time.NewTicker(r.pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox relay batch failed", "err", err)
            }
        }
    }
}

Relay respektuje anulowanie kontekstu, co ułatwia integrację z graceful shutdown (grzeczny shutdown). Aby uzyskać szczegółowe omówienie czasu życia kontekstu i wzorców anulowania, zobacz Go context.Context Done Right.

FOR UPDATE SKIP LOCKED: wzorzec współbieżnych workerów

Funkcja processBatch używa FOR UPDATE SKIP LOCKED do bezpiecznej obsługi współbieżnych workerów relayu:

func (r *OutboxRelay) processBatch(ctx context.Context) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    rows, err := tx.QueryContext(ctx, `
        SELECT id, aggregate_type, aggregate_id, event_type, payload
        FROM outbox_events
        WHERE processed_at IS NULL
          AND attempts < $1
        ORDER BY created_at
        LIMIT $2
        FOR UPDATE SKIP LOCKED
    `, r.maxAttempts, r.batchSize)
    if err != nil {
        return fmt.Errorf("query outbox: %w", err)
    }
    defer rows.Close()

    type row struct {
        id            string
        aggregateType string
        aggregateID   string
        eventType     string
        payload       json.RawMessage
    }

    var batch []row
    for rows.Next() {
        var e row
        if err := rows.Scan(
            &e.id, &e.aggregateType, &e.aggregateID, &e.eventType, &e.payload,
        ); err != nil {
            return fmt.Errorf("scan row: %w", err)
        }
        batch = append(batch, e)
    }
    if err := rows.Err(); err != nil {
        return err
    }

    for _, e := range batch {
        if err := r.publisher.Publish(ctx, e.eventType, e.aggregateID, e.payload); err != nil {
            r.logger.Error("publish failed", "event_id", e.id, "err", err)
            if _, err := tx.ExecContext(ctx,
                `UPDATE outbox_events SET attempts = attempts + 1 WHERE id = $1`, e.id,
            ); err != nil {
                r.logger.Error("increment attempts failed", "event_id", e.id, "err", err)
            }
            continue
        }

        if _, err := tx.ExecContext(ctx,
            `UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`, e.id,
        ); err != nil {
            return fmt.Errorf("mark processed: %w", err)
        }
    }

    return tx.Commit()
}

FOR UPDATE SKIP LOCKED robi dwie rzeczy. Po pierwsze, FOR UPDATE blokuje wybrane wiersze przez czas trwania transakcji, zapobiegając innym transakcjom ich wybór. Po drugie, SKIP LOCKED oznacza, że jeśli wiersz jest już zablokowany przez inną transakcję, zapytanie go pomija zamiast czekać. Wynikiem jest to, że kilku workerów relayu może działać równolegle, a każdy pobierze nieprzekrywający się podzbiór wierszy.

Bez SKIP LOCKED, drugi worker blokowałby się, dopóki pierwsza transakcja nie zatwierdzi, zanim zobaczy te same wiersze – w tym momencie byłyby one już oznaczone jako zakończone. Z SKIP LOCKED, drugi worker natychmiast wybiera inne wiersze zamiast czekać, dając Ci bezpieczne skalowanie poziome.

Zwróć uwagę na separację skanowania, a następnie publikacji w powyższym kodzie: wszystkie wiersze są skanowane do pliku (slice) przed rozpoczęciem pętli publikacji. Pozwala to uniknąć trzymania otwartego kursora *sql.Rows przez wywołania sieciowe do brokera, co utrzymywałoby transakcję otwartą dłużej niż jest to konieczne.

Idempotentność i usuwanie duplikatów

Relay publikuje przynajmniej raz (at-least-once). Jeśli opublikuje zdarzenie, a następnie zawiedzie przed zatwierdzeniem aktualizacji processed_at, opublikuje to samo zdarzenie ponownie po restartie. Jest to nieuniknione – dostarczenie dokładnie raz (exactly-once) przez bazę danych i brokera wiadomości bez koordynatora transakcji rozproszonej wymaga tego kompromisu.

Konsumenci muszą być idempotentni. Najprostsze podejście polega na śledzeniu przetworzonych identyfikatorów zdarzeń w tabeli processed_events:

CREATE TABLE processed_events (
    event_id   UUID PRIMARY KEY,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
func (h *OrderHandler) HandleOrderCreated(ctx context.Context, eventID string, payload []byte) error {
    // Usuwanie duplikatów przy użyciu ID zdarzenia jako klucza naturalnego
    _, err := h.db.ExecContext(ctx, `
        INSERT INTO processed_events (event_id) VALUES ($1)
        ON CONFLICT (event_id) DO NOTHING
    `, eventID)
    if err != nil {
        return fmt.Errorf("dedup check: %w", err)
    }

    // Sprawdź, czy insert faktycznie się odbył (1 wiersz) czy był operacją pustą (0 wierszy)
    // Prostsze podejście: użyj RETURNING lub sprawdź liczby wierszy
    // Jeśli 0 wierszy zostało zmienionych, jest to duplikat -- pominij go
    ...
}

W praktyce wiele zespołów polega na własnych nagłówkach usuwania duplikatów brokera (takich jak pole key Kafka dla tematów skompresowanych dziennikiem lub nagłówek message-id RabbitMQ) i traktuje usuwanie duplikatów na poziomie bazy danych jako rozwiązanie awaryjne. Oba są ważnymi warstwami do zastosowania.

Dołącz outbox event id (UUID) w opublikowanej wiadomości jako klucz usuwania duplikatów. Konsumenci mogą go wtedy używać, niezależnie od preferowanego mechanizmu usuwania duplikatów.

Polityka ponawiania i wiadomości trujące (poison messages)

Kolumna attempts steruje polityką ponawiania. Relay pomija wiersze, gdzie attempts >= maxAttempts i traktuje te wiersze jako wiadomości martwe (dead letters). Odrębny proces lub alert operatora je obsługuje.

Prosty widok wiadomości martwych:

CREATE VIEW outbox_dead_letters AS
SELECT *
FROM outbox_events
WHERE attempts >= 5
  AND processed_at IS NULL
ORDER BY created_at;

Dobra polityka ponawiania w produkcji:

  • Ustaw maxAttempts na 5-10 w zależności od tego, jak kosztowne są ponawiania.
  • Rozważ wykładnicze cofanie (exponential backoff): dołącz kolumnę retry_after i pomijaj wiersze, gdzie retry_after > NOW().
  • Alertuj, gdy COUNT(*) FROM outbox_dead_letters przekroczy próg.
  • Zapewnij ścieżkę ręcznego ponawiania: punkt końcowy administracyjny lub skrypt, który resetuje attempts = 0 i retry_after = NULL dla konkretnych wierszy.

Wiadomości trujące (poison messages) – wiersze, które konsekwentnie zawiedzą z powodu błędu w konsumentcie lub niedopasowania schematu – nie powinny blokować zdrowych wiadomości. Ponieważ relay przetwarza partię na tick i oznacza awarie przyrostem próby zamiast usuwania ich z kolejki, zdrowe wiersze postępują normalnie, podczas gdy zepsute akumulują próby, aż osiągną próg wiadomości martwych.

Kolejność zdarzeń i partycjonowanie

Zapytanie pollingowe sortuje według created_at, co daje kolejność FIFO (first-in-first-out) wewnątrz partii. Dla większości przypadków użycia wystarczy. Kiedy ścisła kolejność per encji ma znaczenie – na przykład, zapewniając, że order.updated nigdy nie zostanie opublikowane przed order.created dla tego samego zamówienia – potrzebujesz kolejności per agregacie.

Dodaj aggregate_id do klauzuli ORDER BY i użyj go jako klucza wiadomości przy publikacji do partycjonowanego tematu, takiego jak Apache Kafka. Kafka kieruje wszystkie wiadomości z tym samym kluczem do tej samej partycji, a partycje są konsumowane w kolejności. Daje Ci to gwarancje kolejności per agregacie bez globalnej kolejności, co wymagałoby pojedynczej instancji relayu.

ORDER BY aggregate_id, created_at

Dla brokerów, które nie obsługują partycjonowanej kolejności (takich jak podstawowe kolejki AMQP), pojedyncza instancja relayu lub kontrole kolejności na poziomie aplikacji w konsumentcie są praktycznymi alternatywami.

Redukcja opóźnienia pollingowego z LISTEN/NOTIFY

Interwał pollingowy jednej sekundy oznacza średnie opóźnienie zdarzenia 500 milisekund. Dla większości obciążeń to w porządku. Dla przypadków, gdzie potrzebujesz opóźnienia bliskiego zeru, mechanizm LISTEN/NOTIFY PostgreSQL pozwala relayowi obudzić się natychmiast, gdy nowy wiersz outbox zostanie wstawiony.

Dodaj wyzwalacz (trigger) do tabeli outbox:

CREATE OR REPLACE FUNCTION notify_outbox_insert() RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify('outbox_event', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_insert_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox_insert();

W relayu, słuchaj kanału i obudź się na powiadomienia, jednocześnie utrzymując awaryjne odpytowanie okresowe:

func (r *OutboxRelay) Run(ctx context.Context) error {
    listener := pq.NewListener(r.dsn, 10*time.Second, time.Minute, nil)
    defer listener.Close()

    if err := listener.Listen("outbox_event"); err != nil {
        return fmt.Errorf("listen: %w", err)
    }

    ticker := time.NewTicker(5 * time.Second) // fallback poll
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-listener.Notify:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (notify)", "err", err)
            }
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (poll)", "err", err)
            }
        }
    }
}

Awaryjny timer obsługuje wszelkie pominięte powiadomienia podczas restartu relayu lub potknięcia sieciowego. Utrzymuj awaryjny interwał na poziomie kilku sekund, a nie milisekund – jego zadaniem jest odzyskiwanie, a nie nisko-opóźnieniowe działanie.

Obserwowalność: metryki, logi i alerty

Outbox to infrastruktura. Traktuj go jak infrastrukturę i instrumentuj go odpowiednio.

Kluczowe metryki:

var (
    outboxPublished = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "outbox_events_published_total",
        Help: "Total outbox events successfully published.",
    })
    outboxFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
        Name: "outbox_events_failed_total",
        Help: "Total outbox publish failures by event type.",
    }, []string{"event_type"})
    outboxPending = prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "outbox_events_pending",
        Help: "Current number of unprocessed outbox events.",
    })
    outboxBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name:    "outbox_batch_duration_seconds",
        Help:    "Duration of each outbox processing batch.",
        Buckets: prometheus.DefBuckets,
    })
)

Odświeżanie Gauge: uruchom okresowe zapytanie, aby utrzymać outbox_events_pending dokładny:

SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

Progi alertów do rozważenia:

  • outbox_events_pending > 1000 przez więcej niż dwie minuty: relay opóźnia się lub utknął.
  • outbox_events_pending rośnie monotonicznie: broker jest niedostępny lub relay zawiedziony.
  • Liczba wiadomości martwych (dead-letter) większa od zera: błąd schematu lub konsumenta wymaga zbadania.
  • outbox_batch_duration_seconds p95 > 5s: baza danych jest wolna lub rozmiar partii jest zbyt duży.

Strukturalne pola logów: dołącz event_id, event_type, aggregate_id i attempt w każdej linii logu z relayu. Te pola pozwalają skorelować nieudaną publikację z konkretnym wierszem outbox i śladem konsumenta dółstrumieniowego.

Outbox vs. bezpośrednia kolejka vs. saga

Wzorzec outbox nie jest odpowiednim narzędziem do każdego problemu koordynacji. Oto porównanie:

Podejście Atomowość Złożoność Kiedy używać
Bezpośrednia publikacja Brak Niska Akceptowalna okazjonalna utrata zdarzeń
Transakcyjny outbox Silna Średnia Niezawodna dostawa zdarzeń z pojedynczej usługi
Wzorzec Saga Ostateczna Wysoka Transakcje wielousługowe obejmujące wiele baz danych
Dwufazowe zatwierdzenie (2PC) Silna Bardzo wysoka Rzadko praktyczne; omijane w większości systemów rozproszonych

Wzorzec outbox gwarantuje, że pojedyncza usługa niezawodnie emituje zdarzenia odzwierciedlające jej własne zmiany stanu. Nie koordynuje zmian stanu przez wiele usług – to jest zadanie wzorca Saga. Wybór brokera – czy RabbitMQ, SQS, czy Kafka – jest niezależny od samego wzorca outbox; relay publikuje do brokera, którego używa Twój system.

Jeśli budujesz sagę, wzorzec outbox jest nadal przydatny: każdy uczestnik sagi zapisuje swoją lokalną zmianę stanu i zdarzenie sagi w jednej transakcji używając outbox, a następnie orkiestrator lub choreografia sagi odczytuje te zdarzenia niezawodnie.

CDC oparte na WAL jako alternatywny relay

Zamiast odpytywania, możesz śledzić Dziennik Wsteczny (Write-Ahead Log, WAL) PostgreSQL i odczytywać wstawienia outbox bezpośrednio ze strumienia replikacji. Narzędzia takie jak Debezium robią to. Zaletami są niższe opóźnienie i brak presji blokowania na tabeli outbox. Wadami są złożoność operacyjna, dedykowany slot replikacji PostgreSQL i zewnętrzna usługa do uruchomienia i monitorowania.

Dla większości zespołów, opisany powyżej relay pollingowy jest właściwym punktem startowym. Śledzenie WAL ma sens, gdy masz wysokie stawki wstawień outbox (dziesiątki tysięcy na sekundę), potrzebujesz opóźnienia zdarzeń poniżej 100 ms lub już uruchamiasz Debezium dla innych potrzeb przechwytywania zmian.

Integracja sqlc

Jeśli używasz sqlc dla typobezpiecznego kodu bazy danych w Go, zapytania outbox pasują naturalnie:

-- name: InsertOutboxEvent :exec
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (@aggregate_type, @aggregate_id, @event_type, @payload);

-- name: FetchOutboxBatch :many
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
  AND attempts < @max_attempts
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED;

-- name: MarkOutboxProcessed :exec
UPDATE outbox_events SET processed_at = NOW() WHERE id = @id;

-- name: IncrementOutboxAttempts :exec
UPDATE outbox_events SET attempts = attempts + 1 WHERE id = @id;

-- name: OutboxPendingCount :one
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

sqlc generuje typobezpieczne funkcje dla każdego zapytania, co unika błędów interpolacji ciągów i utrzymuje logikę zapytań outbox w tym samym miejscu co reszta warstwy dostępu do bazy danych.

Lista kontrolna produkcyjna

Użyj tej listy przed wdrożeniem implementacji outbox:

Baza danych

  • Tabela outbox ma cząstkowy indeks na created_at WHERE processed_at IS NULL
  • Kolumna attempts jest obecna z domyślną wartością 0
  • Widok lub zapytanie wiadomości martwych (dead-letter) jest zdefiniowane
  • Stare przetworzone wiersze są okresowo archiwizowane lub usuwane (nocne zadanie czyszczące wystarczy)

Relay

  • FOR UPDATE SKIP LOCKED użyte w zapytaniu pollingowym
  • Relay działa wewnątrz transakcji (begin przed zapytaniem, commit po wszystkich aktualizacjach)
  • Rozmiar partii jest ograniczony (50-200 wierszy jest typowe)
  • Relay respektuje anulowanie kontekstu dla graceful shutdown
  • Nieudane publikacje zwiększają attempts zamiast powodować awarię partii

Idempotentność

  • Opublikowana wiadomość zawiera outbox id jako klucz usuwania duplikatów
  • Konsumenci są idempotentni lub broker zapewnia usuwanie duplikatów
  • Zobacz Idempotentność w systemach rozproszonych po wzorce usuwania duplikatów

Obserwowalność

  • Gauge outbox_events_pending jest monitorowane i alertowane
  • Liczba wiadomości martwych jest alertowana
  • Czas trwania partii relayu jest śledzony
  • Strukturalne logi zawierają event_id, event_type i aggregate_id

Operacje

  • Istnieje ścieżka ręcznego ponawiania dla wierszy wiadomości martwych
  • Zachowanie restartu relayu jest testowane (czy poprawnie ponownie publikuje?)
  • Zachowanie awarii brokera jest testowane (czy outbox rośnie i opróżnia się poprawnie?)

Ostateczne myśli

Problem podwójnego zapisu jest łatwy do zbycia jako przypadek brzegowy, dopóki nie spowoduje incydentu. Wzorzec transakcyjnej skrzynki wychodzącej rozwiązuje go przy użyciu narzędzi, które już masz: transakcji PostgreSQL, gorutiny tła i jednej dodatkowej tabeli. Relay jest prosty do zbudowania, prosty w obsłudze i prosty do rozumienia.

Koszt polega na tym, że konsumenci muszą być zaprojektowani dla dostawy przynajmniej raz. Jest to rozsądny kompromis. Dostawa dokładnie raz przez bazę danych i brokera bez transakcji rozproszonych nie jest osiągalna w praktyce – i udawanie inaczej prowadzi do systemów, które cicho upuszczają lub podwójnie przetwarzają zdarzenia w warunkach awarii.

Zapisz zdarzenie z danymi. Przekazuj je niezawodnie. Spraw, aby konsumenci byli idempotentni. To jest cały wzorzec.

Ten artykuł jest częścią klastrowej Architektura Aplikacji w Produkcji.

Źródła

Subskrybuj

Otrzymuj nowe wpisy o systemach, infrastrukturze i inżynierii AI.