Шаблон Transactional Outbox на Go с PostgreSQL

Запишите событие с данными. Никогда не разделяйте их.

Содержимое страницы

Две операции, которые должны завершиться успешно вместе, в конечном итоге провалятся по отдельности. Ваш сервис заказов сохраняет заказ в базу данных, а затем публикует событие order.created в брокере сообщений.

Эти две операции выполняются последовательно.

Однако между ними могут возникнуть проблемы: брокер недоступен, истекло время ожидания сети, процесс перезагружается или контейнер принудительно завершается. Запись в базу данных прошла успешно. Публикация события — нет. Нижний сервис, которому необходимо знать о новом заказе, об этом не узнает. Никто не замечал проблемы, пока клиент не позвонил.

Это проблема двойной записи (dual-write problem), и это один из самых распространенных источников тихой потери данных в распределенных системах. Паттерн транзакционного почтового ящика (transactional outbox pattern) является стандартным решением этой проблемы.

Паттерн транзакционного почтового ящика – событие и данные записываются вместе

Проблема двойной записи

Механизм отказа легко понять, как только вы его увидите:

BEGIN;
  INSERT INTO orders ...   -- успешно
COMMIT;

PUBLISH order.created ...  -- ошибка, сбой или вызов никогда не происходит

База данных и брокер сообщений не имеют общей границы транзакции. Нет отката (rollback), который охватывал бы обе системы. Каждая служба, выполняющая последовательность save -> publish (сохранение -> публикация), имеет этот пробел. Этот паттерн проявляется во многих формах:

  • db.Save(order) с последующим events.Publish(OrderCreated{...})
  • HTTP-обработчик, который фиксирует транзакцию, а затем вызывает внешний вебхук
  • Воркер, который обрабатывает запись из одной очереди и записывает результаты в другую

Итог во всех случаях одинаков: одна сторона завершается успешно, а другая — с ошибкой, и система оказывается в состоянии, невидимом для мониторинга, потому что обе отдельные операции когда-либо возвращали успешный результат.

Петля повторных попыток (retry loop) не исправляет эту ситуацию. Повторная попытка публикации после фиксации транзакции в базе данных работает только в том случае, если сама повторная попытка надежна — что требует именно той гарантии сохранности данных, которой у вас нет.

Что делает паттерн транзакционного почтового ящика

Паттерн почтового ящика устраняет этот разрыв, полностью убирая прямую публикацию. Вместо вызова брокера изнутри вашей бизнес-логики, вы записываете запись события в таблицу outbox в той же транзакции базы данных, что и бизнес-данные. Отдельный фоновый процесс — ретранслятор (relay) — читает из таблицы почтового ящика и публикует сообщения в брокере.

BEGIN;
  INSERT INTO orders ...         -- бизнес-данные
  INSERT INTO outbox_events ...  -- запись события
COMMIT;

-- Процесс ретрансляции (отдельно):
SELECT ... FROM outbox_events FOR UPDATE SKIP LOCKED;
PUBLISH order.created ...
UPDATE outbox_events SET processed_at = NOW() WHERE id = $1;

Обе записи либо завершаются успешно, либо обе завершаются с ошибкой. Гарантия транзакции, которую вы уже имеете от PostgreSQL, теперь охватывает также и запись события. Ретранслятор может повторять попытки публикации столько раз, сколько необходимо, поскольку событие находится в надежном хранилище. Если ретранслятор аварийно завершает работу, он перезапускается и повторяет попытку. Худший исход заключается в том, что событие будет опубликовано более одного раза — что обрабатывается путем обеспечения идемпотентности потребителей (см. Идемпотентность в распределенных системах).

Схема PostgreSQL для таблицы почтового ящика

Схема намеренно проста:

CREATE TABLE outbox_events (
    id             UUID         PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id   VARCHAR(100) NOT NULL,
    event_type     VARCHAR(100) NOT NULL,
    payload        JSONB        NOT NULL,
    attempts       INT          NOT NULL DEFAULT 0,
    created_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    processed_at   TIMESTAMPTZ
);

-- Частичный индекс: индексирует только необработанные строки, остается небольшим по мере отметки строк как выполненных
CREATE INDEX idx_outbox_unprocessed
    ON outbox_events (created_at)
    WHERE processed_at IS NULL;

Частичный индекс по created_at WHERE processed_at IS NULL имеет важное значение. Без него индекс растет с каждым написанным событием, и запрос на опрос ретранслятора замедляется со временем. С ним индекс охватывает только ожидающие строки, которые в стабильном состоянии представляют собой небольшой, ограниченный набор, независимо от того, сколько событий было опубликовано.

Ключевые выборки полей:

  • aggregate_type и aggregate_id описывают, к какой сущности относится событие. Полезно для гарантий упорядочивания и маршрутизации.
  • event_type — это имя события, которое ожидают ваши потребители.
  • payload JSONB хранит тело события. Используйте JSONB, а не TEXT, чтобы при необходимости можно было выполнять запросы к нему.
  • attempts отслеживает, сколько раз ретранслятор пытался опубликовать эту строку. Используется для лимитов повторных попыток и обработки «мертвых писем» (dead-letter).
  • processed_at имеет значение NULL для ожидающих строк и устанавливается, когда ретранслятор успешно публикует сообщение.

Запись бизнес-данных и события почтового ящика в одной транзакции

Бизнес-логика записывает обе записи внутри одного вызова BeginTx / Commit. Здесь нет вызова публикации — только записи в базу данных.

type OrderService struct {
    db *sql.DB
}

func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO orders (id, customer_id, total, created_at)
        VALUES ($1, $2, $3, NOW())
    `, order.ID, order.CustomerID, order.Total); err != nil {
        return fmt.Errorf("insert order: %w", err)
    }

    payload, err := json.Marshal(map[string]any{
        "order_id":    order.ID,
        "customer_id": order.CustomerID,
        "total":       order.Total,
    })
    if err != nil {
        return fmt.Errorf("marshal payload: %w", err)
    }

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO outbox_events
            (aggregate_type, aggregate_id, event_type, payload)
        VALUES ($1, $2, $3, $4)
    `, "order", order.ID, "order.created", payload); err != nil {
        return fmt.Errorf("insert outbox event: %w", err)
    }

    return tx.Commit()
}

Если tx.Commit() завершается с ошибкой, ни строка заказа, ни строка почтового ящика не сохраняются. Если она успешна, обе записи гарантированно находятся в базе данных. Ретранслятор может опубликовать событие в любой момент после этого — немедленно, через секунду или после перезапуска ретранслятора после сбоя.

Это единственное изменение кода, необходимое в вашем бизнес-слое. Остальная часть паттерна находится в ретрансляторе.

Реализация ретранслятора на Go

Ретранслятор — это фоновый воркер, который опрашивает таблицу почтового ящика по таймеру. Он извлекает пакет необработанных строк, публикует каждую из них и помечает как выполненную. Держите его в том же бинарном файле, что и ваше приложение, или запускайте как отдельный процесс — оба варианта работают, но один бинарный файл проще в эксплуатации.

type OutboxRelay struct {
    db          *sql.DB
    publisher   Publisher
    logger      *slog.Logger
    batchSize   int
    pollInterval time.Duration
    maxAttempts  int
}

func (r *OutboxRelay) Run(ctx context.Context) error {
    ticker := time.NewTicker(r.pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox relay batch failed", "err", err)
            }
        }
    }
}

Ретранслятор учитывает отмену контекста, что упрощает интеграцию с graceful shutdown (плавным завершением работы). Для подробного рассмотрения жизненного цикла контекста и паттернов отмены см. Go context.Context Done Right.

FOR UPDATE SKIP LOCKED: паттерн конкурентных воркеров

Функция processBatch использует FOR UPDATE SKIP LOCKED для безопасной обработки конкурентных воркеров ретрансляции:

func (r *OutboxRelay) processBatch(ctx context.Context) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    rows, err := tx.QueryContext(ctx, `
        SELECT id, aggregate_type, aggregate_id, event_type, payload
        FROM outbox_events
        WHERE processed_at IS NULL
          AND attempts < $1
        ORDER BY created_at
        LIMIT $2
        FOR UPDATE SKIP LOCKED
    `, r.maxAttempts, r.batchSize)
    if err != nil {
        return fmt.Errorf("query outbox: %w", err)
    }
    defer rows.Close()

    type row struct {
        id            string
        aggregateType string
        aggregateID   string
        eventType     string
        payload       json.RawMessage
    }

    var batch []row
    for rows.Next() {
        var e row
        if err := rows.Scan(
            &e.id, &e.aggregateType, &e.aggregateID, &e.eventType, &e.payload,
        ); err != nil {
            return fmt.Errorf("scan row: %w", err)
        }
        batch = append(batch, e)
    }
    if err := rows.Err(); err != nil {
        return err
    }

    for _, e := range batch {
        if err := r.publisher.Publish(ctx, e.eventType, e.aggregateID, e.payload); err != nil {
            r.logger.Error("publish failed", "event_id", e.id, "err", err)
            if _, err := tx.ExecContext(ctx,
                `UPDATE outbox_events SET attempts = attempts + 1 WHERE id = $1`, e.id,
            ); err != nil {
                r.logger.Error("increment attempts failed", "event_id", e.id, "err", err)
            }
            continue
        }

        if _, err := tx.ExecContext(ctx,
            `UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`, e.id,
        ); err != nil {
            return fmt.Errorf("mark processed: %w", err)
        }
    }

    return tx.Commit()
}

FOR UPDATE SKIP LOCKED делает две вещи. Во-первых, FOR UPDATE блокирует выбранные строки на время транзакции, предотвращая их выбор любой другой транзакцией. Во-вторых, SKIP LOCKED означает, что если строка уже заблокирована другой транзакцией, запрос пропускает ее, вместо того чтобы ждать. В результате несколько воркеров ретрансляции могут работать параллельно, и каждый из них будет брать на себя непересекающийся подмножество строк.

Без SKIP LOCKED второй воркер блокировался бы до тех пор, пока первая транзакция не зафиксирует изменения, прежде чем увидеть те же строки — в этот момент они уже будут помечены как выполненные. С SKIP LOCKED второй воркер немедленно берет другие строки, вместо того чтобы ждать, что обеспечивает безопасное горизонтальное масштабирование.

Обратите внимание на разделение сканирования и публикации в приведенном выше коде: все строки сканируются в срез (slice) перед началом цикла публикации. Это предотвращает удержание открытого курсора *sql.Rows во время сетевых вызовов к брокеру, что удерживало бы транзакцию открытой дольше, чем необходимо.

Идемпотентность и дедупликация

Ретранслятор публикует сообщения как минимум один раз. Если он публикует событие, а затем аварийно завершает работу до фиксации обновления processed_at, он опубликует то же самое событие снова при перезапуске. Это неизбежно — доставка «ровно один раз» (exactly-once) через базу данных и брокер сообщений без координатора распределенных транзакций требует этого компромисса.

Потребители должны быть идемпотентными. Самый простой подход — отслеживать идентификаторы обработанных событий в таблице processed_events:

CREATE TABLE processed_events (
    event_id   UUID PRIMARY KEY,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
func (h *OrderHandler) HandleOrderCreated(ctx context.Context, eventID string, payload []byte) error {
    // Дедупликация, использующая идентификатор события в качестве естественного ключа
    _, err := h.db.ExecContext(ctx, `
        INSERT INTO processed_events (event_id) VALUES ($1)
        ON CONFLICT (event_id) DO NOTHING
    `, eventID)
    if err != nil {
        return fmt.Errorf("dedup check: %w", err)
    }

    // Проверка, действительно ли произошла вставка (1 строка) или это была операция no-op (0 строк)
    // Более простой подход: использовать RETURNING или проверять количество затронутых строк
    // Если затронуто 0 строк, это дубликат — пропустите его
    ...
}

На практике многие команды полагаются на собственные заголовки дедупликации брокера (например, поле key Kafka для топик с логической компакцией или заголовок message-id RabbitMQ) и рассматривают дедупликацию на уровне базы данных как резервный вариант. Оба являются валидными слоями, которые можно применять.

Включите идентификатор события почтового ящика id (UUID) в опубликованное сообщение в качестве ключа дедупликации. Затем потребители могут использовать его независимо от предпочитаемого ими механизма дедупликации.

Политика повторных попыток и «ядовитые» сообщения

Столбец attempts управляет политикой повторных попыток. Ретранслятор пропускает строки, где attempts >= maxAttempts, и рассматривает эти строки как «мертвые письма». Отдельный процесс или оповещение оператора обрабатывает их.

Простой вид для «мертвых писем»:

CREATE VIEW outbox_dead_letters AS
SELECT *
FROM outbox_events
WHERE attempts >= 5
  AND processed_at IS NULL
ORDER BY created_at;

Хорошая политика повторных попыток для production:

  • Установите maxAttempts на 5-10 в зависимости от того, насколько дороги повторные попытки.
  • Рассмотрите экспоненциальное ожидание (exponential backoff): включите столбец retry_after и пропускайте строки, где retry_after > NOW().
  • Оповещайте, если COUNT(*) FROM outbox_dead_letters превышает пороговое значение.
  • Предоставьте ручной путь для повторных попыток: административный endpoint или скрипт, который сбрасывает attempts = 0 и retry_after = NULL для конкретных строк.

«Ядовитые» сообщения — строки, которые постоянно завершаются с ошибкой из-за ошибки в потребителе или несовпадения схемы — не должны блокировать здоровые сообщения. Поскольку ретранслятор обрабатывает пакет за такт и помечает неудачи увеличением счетчика попыток, а не удалением их из очереди, здоровые строки продолжают обрабатываться нормально, тогда как «ядовитые» накапливают попытки, пока не достигнут порога «мертвых писем».

Упорядочивание событий и партиционирование

Запрос опроса сортируется по created_at, что обеспечивает упорядочивание «первый вошел — первый вышел» (FIFO) в пределах пакета. Для большинства случаев использования этого достаточно. Когда важно строгое упорядочивание по сущности (entity) — например, чтобы гарантировать, что order.updated никогда не публикуется раньше order.created для одного и того же заказа — вам необходимо упорядочивание по агрегату (per-aggregate ordering).

Добавьте aggregate_id в предложение ORDER BY и используйте его как ключ сообщения при публикации в партиционированный топик, такой как Apache Kafka. Kafka направляет все сообщения с одним и тем же ключом в один и тот же раздел, и разделы потребляются в определенном порядке. Это дает вам гарантии упорядочивания по агрегату без глобального упорядочивания, которое потребовало бы одного экземпляра ретранслятора.

ORDER BY aggregate_id, created_at

Для брокеров, которые не поддерживают упорядочивание по разделам (например, базовые очереди AMQP), ретранслятор с одним экземпляром или проверки упорядочивания на уровне приложения в потребителе являются практическими альтернативами.

Снижение задержки опроса с помощью LISTEN/NOTIFY

Интервал опроса в одну секунду означает среднюю задержку события в 500 миллисекунд. Для большинства рабочих нагрузок это приемлемо. Для случаев, когда вам нужна задержка, близкая к нулю, механизм LISTEN/NOTIFY PostgreSQL позволяет ретранслятору просыпаться немедленно при вставке новой строки в почтовый ящик.

Добавьте триггер в таблицу почтового ящика:

CREATE OR REPLACE FUNCTION notify_outbox_insert() RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify('outbox_event', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_insert_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox_insert();

В ретрансляторе слушайте канал и просыпайтесь по уведомлениям, одновременно сохраняя резервный периодический опрос:

func (r *OutboxRelay) Run(ctx context.Context) error {
    listener := pq.NewListener(r.dsn, 10*time.Second, time.Minute, nil)
    defer listener.Close()

    if err := listener.Listen("outbox_event"); err != nil {
        return fmt.Errorf("listen: %w", err)
    }

    ticker := time.NewTicker(5 * time.Second) // резервный опрос
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-listener.Notify:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (notify)", "err", err)
            }
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (poll)", "err", err)
            }
        }
    }
}

Резервный таймер обрабатывает любые пропущенные уведомления во время перезапуска ретранслятора или сетевых сбоев. Держите интервал резервного опроса в несколько секунд, а не миллисекунд — его задача — восстановление, а не низкая задержка.

Наблюдаемость: метрики, логи и оповещения

Почтовый ящик — это инфраструктура. Относитесь к нему как к инфраструктуре и соответствующим образом инструментализируйте его.

Ключевые метрики:

var (
    outboxPublished = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "outbox_events_published_total",
        Help: "Total outbox events successfully published.",
    })
    outboxFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
        Name: "outbox_events_failed_total",
        Help: "Total outbox publish failures by event type.",
    }, []string{"event_type"})
    outboxPending = prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "outbox_events_pending",
        Help: "Current number of unprocessed outbox events.",
    })
    outboxBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name:    "outbox_batch_duration_seconds",
        Help:    "Duration of each outbox processing batch.",
        Buckets: prometheus.DefBuckets,
    })
)

Обновление Gauge: запускайте периодический запрос, чтобы поддерживать точность outbox_events_pending:

SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

Пороговые значения оповещений, которые следует рассмотреть:

  • outbox_events_pending > 1000 более двух минут: ретранслятор отстает или застрял.
  • outbox_events_pending монотонно растет: брокер недоступен или ретранслятор аварийно завершился.
  • Количество «мертвых писем» не равно нулю: требуется расследование ошибки схемы или потребителя.
  • outbox_batch_duration_seconds p95 > 5s: база данных работает медленно или размер пакета слишком велик.

Структурированные поля логов: включайте event_id, event_type, aggregate_id и attempt в каждую строку лога от ретранслятора. Эти поля позволяют вам сопоставить неудачную публикацию с конкретной строкой почтового ящика и трассировкой нижнего потребителя.

Почтовый ящик vs. прямая очередь vs. saga

Паттерн почтового ящика — не всегда подходящий инструмент для каждой задачи координации. Вот сравнение:

Подход Атомарность Сложность Когда использовать
Прямая публикация Отсутствует Низкая Приемлемо периодическое потеряние событий
Транзакционный почтовый ящик Высокая Средняя Надежная доставка событий из одной службы
Паттерн Saga Eventual (конечная) Высокая Транзакции нескольких служб, охватывающие несколько баз данных
Двухфазный коммит Высокая Очень высокая Редко практично; избегается в большинстве распределенных систем

Паттерн почтового ящика гарантирует, что одна служба надежно излучает события, отражающие изменения ее собственного состояния. Он не координирует изменения состояния между несколькими службами — для этого предназначен Паттерн Saga. Выбор брокера — будь то RabbitMQ, SQS или Kafka — независим от самого паттерна почтового ящика; ретранслятор публикует сообщения в том брокере, который использует ваша система.

Если вы создаете saga, паттерн почтового ящика все еще полезен: каждый участник saga записывает локальное изменение состояния и свое событие saga в одной транзакции, используя почтовый ящик, затем оркестратор или хореография saga надежно считывает эти события.

WAL-based CDC как альтернативный ретранслятор

Вместо опроса вы можете хвостить журнал предварительной записи (Write-Ahead Log, WAL) PostgreSQL и читать вставки в почтовый ящик напрямую из потока репликации. Инструменты, такие как Debezium, делают это. Преимущества заключаются в более низкой задержке и отсутствии давления блокировок на таблицу почтового ящика. Недостатки — операционная сложность, выделенный слот репликации PostgreSQL и внешняя служба, которую нужно запускать и мониторить.

Для большинства команд ретранслятор опроса, описанный выше, является правильной отправной точкой. Хвостинг WAL имеет смысл, когда у вас высокие скорости вставки в почтовый ящик (десятки тысяч в секунду), требуется задержка событий менее 100 мс или вы уже используете Debezium для других задач захвата изменений.

Интеграция с sqlc

Если вы используете sqlc для типобезопасного кода базы данных на Go, запросы почтового ящика естественно вписываются:

-- name: InsertOutboxEvent :exec
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (@aggregate_type, @aggregate_id, @event_type, @payload);

-- name: FetchOutboxBatch :many
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
  AND attempts < @max_attempts
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED;

-- name: MarkOutboxProcessed :exec
UPDATE outbox_events SET processed_at = NOW() WHERE id = @id;

-- name: IncrementOutboxAttempts :exec
UPDATE outbox_events SET attempts = attempts + 1 WHERE id = @id;

-- name: OutboxPendingCount :one
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

sqlc генерирует типобезопасные функции для каждого запроса, что избегает ошибок интерполяции строк и сохраняет логику запросов почтового ящика вместе с остальным слоем доступа к базе данных.

Чек-лист для Production

Используйте это перед выпуском реализации почтового ящика:

База данных

  • Таблица почтового ящика имеет частичный индекс по created_at WHERE processed_at IS NULL
  • Столбец attempts присутствует со значением по умолчанию 0
  • Определен вид или запрос для «мертвых писем»
  • Старые обработанные строки периодически архивируются или удаляются (ночной скрипт очистки достаточен)

Ретранслятор

  • FOR UPDATE SKIP LOCKED используется в запросе опроса
  • Ретранслятор работает внутри транзакции (begin перед запросом, commit после всех обновлений)
  • Размер пакета ограничен (50-200 строк — типично)
  • Ретранслятор учитывает отмену контекста для плавного завершения работы
  • Неудачные публикации увеличивают attempts, а не приводят к прерыванию пакета

Идемпотентность

  • Опубликованное сообщение включает id почтового ящика в качестве ключа дедупликации
  • Потребители идемпотентны или брокер обеспечивает дедупликацию
  • См. Идемпотентность в распределенных системах для паттернов дедупликации

Наблюдаемость

  • Gauge outbox_events_pending мониторится и имеет оповещения
  • Оповещения настроены на количество «мертвых писем»
  • Длительность пакета ретрансляции отслеживается
  • Структурированные логи включают event_id, event_type и aggregate_id

Операции

  • Существует ручной путь для повторных попыток для строк «мертвых писем»
  • Поведение перезапуска ретранслятора протестировано (правильно ли он повторно публикует?)
  • Поведение при отказе брокера протестировано (правильно ли растет и очищается почтовый ящик?)

Финальные мысли

Проблема двойной записи легко списать на крайний случай, пока она не вызовет инцидент. Паттерн транзакционного почтового ящика решает ее с помощью инструментов, которые у вас уже есть: транзакция PostgreSQL, фоновая горутина и одна дополнительная таблица. Ретранслятор прост в построении, прост в эксплуатации и прост в понимании.

Цена заключается в том, что потребители должны быть спроектированы для доставки «хотя бы один раз». Это разумный компромисс. Доставка «ровно один раз» через базу данных и брокер без распределенных транзакций на практике недостижима — и притворство обратного приводит к системам, которые при сбоях тихо пропускают или дублируют обработку событий.

Записывайте событие вместе с данными. Надежно ретранслируйте его. Делайте потребителей идемпотентными. В этом весь паттерн.

Эта статья является частью кластера App Architecture in Production.

Источники

Подписаться

Получайте новые материалы про системы, инфраструктуру и AI engineering.