Padrão de Caixas de Saída Transacional em Go com PostgreSQL

Escreva o evento com os dados. Nunca os separe.

Conteúdo da página

Duas escritas que deveriam ter sucesso juntas eventualmente falharão separadamente. Seu serviço de pedidos salva o pedido no banco de dados e, em seguida, publica um evento order.created em um broker de mensagens.

Essas duas operações são executadas uma após a outra.

Entre elas, algo dá errado: o broker está fora do ar, a rede fica com tempo limite (timeout), o processo é reiniciado ou o container é expulso. A escrita no banco de dados teve sucesso. A publicação não. O serviço downstream que precisa saber sobre o novo pedido nunca fica sabendo. Ninguém percebeu até que um cliente ligou.

Este é o problema da escrita dupla (dual-write), e é uma das fontes mais comuns de perda silenciosa de dados em sistemas distribuídos. O padrão de caixa de saída transacional (transactional outbox) é a correção padrão.

Padrão de caixa de saída transacional – evento e dados escritos juntos

O problema da escrita dupla

O modo de falha é fácil de entender uma vez que você o vê:

BEGIN;
  INSERT INTO orders ...   -- tem sucesso
COMMIT;

PUBLISH order.created ...  -- falha, trava ou nunca é alcançado

O banco de dados e o broker de mensagens não compartilham uma fronteira de transação. Não há nenhuma rollback que cubra ambos. Cada serviço que faz save -> publish em sequência tem essa lacuna. O padrão aparece em muitas formas:

  • db.Save(order) seguido por events.Publish(OrderCreated{...})
  • Handler HTTP que confirma uma transação e então chama um webhook externo
  • Worker que processa um registro de uma fila e escreve resultados em outra

O resultado em todos os casos é o mesmo: um lado tem sucesso enquanto o outro falha, e o sistema termina em um estado que é invisível ao monitoramento porque ambas as operações individuais retornaram sucesso em algum momento.

Um loop de retry não corrige isso. Refazer a tentativa de publicação após o commit do banco de dados só funciona se o retry em si for confiável – o que requer exatamente a garantia de durabilidade que você não tem.

O que o padrão de caixa de saída transacional faz

O padrão outbox elimina a lacuna removendo a publicação direta inteiramente. Em vez de chamar o broker dentro da sua lógica de negócios, você escreve um registro de evento em uma tabela outbox na mesma transação de banco de dados que os dados de negócios. Um processo de fundo separado – o relay (retransmissor) – lê da tabela outbox e publica no broker.

BEGIN;
  INSERT INTO orders ...         -- dados de negócios
  INSERT INTO outbox_events ...  -- registro de evento
COMMIT;

-- Processo relay (separadamente):
SELECT ... FROM outbox_events FOR UPDATE SKIP LOCKED;
PUBLISH order.created ...
UPDATE outbox_events SET processed_at = NOW() WHERE id = $1;

Ambas as escritas têm sucesso ou ambas falham. A garantia de transação que você já tem do PostgreSQL agora também cobre o registro de evento. O relay pode refazer a tentativa de publicação quantas vezes forem necessárias porque o evento fica em armazenamento durável. Se o relay travar no meio do caminho, ele reinicia e refaz a tentativa. O pior cenário é que o evento seja publicado mais de uma vez – o que é tratado tornando os consumidores idempotentes (veja Idempotência em Sistemas Distribuídos).

Esquema PostgreSQL para a tabela outbox

O esquema é intencionalmente simples:

CREATE TABLE outbox_events (
    id             UUID         PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id   VARCHAR(100) NOT NULL,
    event_type     VARCHAR(100) NOT NULL,
    payload        JSONB        NOT NULL,
    attempts       INT          NOT NULL DEFAULT 0,
    created_at     TIMESTAMPTZ  NOT NULL DEFAULT NOW(),
    processed_at   TIMESTAMPTZ
);

-- Índice parcial: indexa apenas linhas não processadas, permanece pequeno conforme as linhas são marcadas como concluídas
CREATE INDEX idx_outbox_unprocessed
    ON outbox_events (created_at)
    WHERE processed_at IS NULL;

O índice parcial em created_at WHERE processed_at IS NULL é importante. Sem ele, o índice cresce com cada evento já escrito e a consulta de polling do relay fica mais lenta com o tempo. Com ele, o índice cobre apenas as linhas pendentes, que em estado estacionário são um conjunto pequeno e limitado, independentemente de quantos eventos tenham sido publicados.

Escolhas de campos chave:

  • aggregate_type e aggregate_id descrevem a qual entidade o evento pertence. Útil para garantias de ordenação e roteamento.
  • event_type é o nome do evento que seus consumidores esperam.
  • payload JSONB armazena o corpo do evento. Use JSONB em vez de TEXT para que você possa consultá-lo se necessário.
  • attempts rastreia quantas vezes o relay tentou publicar esta linha. Usado para limites de retry e tratamento de mensagens mortas (dead-letter).
  • processed_at é NULL para linhas pendentes e definido quando o relay publica com sucesso.

Escrevendo dados de negócios e evento outbox em uma única transação

A lógica de negócios escreve ambos os registros dentro de uma única chamada BeginTx / Commit. Não há chamada de publicação aqui – apenas escritas no banco de dados.

type OrderService struct {
    db *sql.DB
}

func (s *OrderService) CreateOrder(ctx context.Context, order Order) error {
    tx, err := s.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO orders (id, customer_id, total, created_at)
        VALUES ($1, $2, $3, NOW())
    `, order.ID, order.CustomerID, order.Total); err != nil {
        return fmt.Errorf("insert order: %w", err)
    }

    payload, err := json.Marshal(map[string]any{
        "order_id":    order.ID,
        "customer_id": order.CustomerID,
        "total":       order.Total,
    })
    if err != nil {
        return fmt.Errorf("marshal payload: %w", err)
    }

    if _, err := tx.ExecContext(ctx, `
        INSERT INTO outbox_events
            (aggregate_type, aggregate_id, event_type, payload)
        VALUES ($1, $2, $3, $4)
    `, "order", order.ID, "order.created", payload); err != nil {
        return fmt.Errorf("insert outbox event: %w", err)
    }

    return tx.Commit()
}

Se tx.Commit() falhar, nem a linha do pedido nem a linha do outbox são persistidas. Se tiver sucesso, ambas são garantidas de estar no banco de dados. O relay pode publicar o evento em qualquer ponto após isso – imediatamente, em um segundo, ou após o relay reiniciar seguindo uma falha.

Esta é a única alteração de código necessária em sua camada de negócios. O resto do padrão vive no relay.

Implementação do relay em Go

O relay é um worker de fundo que faz polling na tabela outbox em um timer. Ele busca um lote de linhas não processadas, publica cada uma e as marca como concluídas. Mantenha-o no mesmo binário que sua aplicação ou execute-o como um processo separado – ambos funcionam, mas o mesmo binário é mais simples de operar.

type OutboxRelay struct {
    db          *sql.DB
    publisher   Publisher
    logger      *slog.Logger
    batchSize   int
    pollInterval time.Duration
    maxAttempts  int
}

func (r *OutboxRelay) Run(ctx context.Context) error {
    ticker := time.NewTicker(r.pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox relay batch failed", "err", err)
            }
        }
    }
}

O relay respeita o cancelamento de contexto, o que facilita a integração com encerramento gracioso (graceful shutdown). Para um tratamento detalhado do ciclo de vida do contexto e padrões de cancelamento, veja Go context.Context Feito Corretamente.

FOR UPDATE SKIP LOCKED: o padrão de worker concorrente

A função processBatch usa FOR UPDATE SKIP LOCKED para lidar com segurança com workers de relay concorrentes:

func (r *OutboxRelay) processBatch(ctx context.Context) error {
    tx, err := r.db.BeginTx(ctx, nil)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback()

    rows, err := tx.QueryContext(ctx, `
        SELECT id, aggregate_type, aggregate_id, event_type, payload
        FROM outbox_events
        WHERE processed_at IS NULL
          AND attempts < $1
        ORDER BY created_at
        LIMIT $2
        FOR UPDATE SKIP LOCKED
    `, r.maxAttempts, r.batchSize)
    if err != nil {
        return fmt.Errorf("query outbox: %w", err)
    }
    defer rows.Close()

    type row struct {
        id            string
        aggregateType string
        aggregateID   string
        eventType     string
        payload       json.RawMessage
    }

    var batch []row
    for rows.Next() {
        var e row
        if err := rows.Scan(
            &e.id, &e.aggregateType, &e.aggregateID, &e.eventType, &e.payload,
        ); err != nil {
            return fmt.Errorf("scan row: %w", err)
        }
        batch = append(batch, e)
    }
    if err := rows.Err(); err != nil {
        return err
    }

    for _, e := range batch {
        if err := r.publisher.Publish(ctx, e.eventType, e.aggregateID, e.payload); err != nil {
            r.logger.Error("publish failed", "event_id", e.id, "err", err)
            if _, err := tx.ExecContext(ctx,
                `UPDATE outbox_events SET attempts = attempts + 1 WHERE id = $1`, e.id,
            ); err != nil {
                r.logger.Error("increment attempts failed", "event_id", e.id, "err", err)
            }
            continue
        }

        if _, err := tx.ExecContext(ctx,
            `UPDATE outbox_events SET processed_at = NOW() WHERE id = $1`, e.id,
        ); err != nil {
            return fmt.Errorf("mark processed: %w", err)
        }
    }

    return tx.Commit()
}

FOR UPDATE SKIP LOCKED faz duas coisas. Primeiro, FOR UPDATE trava as linhas selecionadas pela duração da transação, impedindo que qualquer outra transação as selecione. Segundo, SKIP LOCKED significa que se uma linha já estiver travada por outra transação, a consulta a ignora em vez de esperar. O resultado é que múltiplos workers de relay podem rodar em paralelo e cada um pegará um subconjunto não sobreposto de linhas.

Sem SKIP LOCKED, um segundo worker bloquearia até que a primeira transação confirmasse antes de ver as mesmas linhas – ponto em que elas já estariam marcadas como concluídas. Com SKIP LOCKED, o segundo worker imediatamente pega linhas diferentes em vez de esperar, dando a você escalabilidade horizontal segura.

Note a separação de escanear-então-publicar no código acima: todas as linhas são escaneadas para um slice antes que o loop de publicação comece. Isso evita segurar um cursor *sql.Rows aberto através de chamadas de rede para o broker, o que manteria a transação aberta por mais tempo do que o necessário.

Idempotência e deduplicação

O relay publica pelo menos uma vez. Se ele publicar um evento e depois travar antes de confirmar a atualização de processed_at, ele publicará o mesmo evento novamente na reinicialização. Isso é inevitável – entrega exatamente-uma-vez entre um banco de dados e um broker de mensagens sem um coordenador de transação distribuída requer essa compensação.

Os consumidores devem ser idempotentes. A abordagem mais simples é rastrear IDs de eventos processados em uma tabela processed_events:

CREATE TABLE processed_events (
    event_id   UUID PRIMARY KEY,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
func (h *OrderHandler) HandleOrderCreated(ctx context.Context, eventID string, payload []byte) error {
    // Deduplicar usando o ID do evento como chave natural
    _, err := h.db.ExecContext(ctx, `
        INSERT INTO processed_events (event_id) VALUES ($1)
        ON CONFLICT (event_id) DO NOTHING
    `, eventID)
    if err != nil {
        return fmt.Errorf("dedup check: %w", err)
    }

    // Verificar se o insert realmente aconteceu (1 linha) ou foi um no-op (0 linhas)
    // Uma abordagem mais simples: usar RETURNING ou verificar linhas afetadas
    // Se 0 linhas afetadas, é um duplicado -- pular
    ...
}

Na prática, muitas equipes dependem dos próprios cabeçalhos de deduplicação do broker (como o campo key do Kafka para tópicos compactados por log, ou o cabeçalho message-id do RabbitMQ) e tratam a deduplicação no nível do banco de dados como um fallback. Ambos são camadas válidas para aplicar.

Inclua o id do evento outbox (um UUID) na mensagem publicada como chave de deduplicação. Os consumidores podem então usá-la independentemente do mecanismo de deduplicação que preferirem.

Política de retry e mensagens venenosas (poison messages)

A coluna attempts dirige a política de retry. O relay ignora linhas onde attempts >= maxAttempts e trata essas linhas como letras mortas (dead letters). Um processo separado ou alerta de operador lida com elas.

Uma visão simples de dead-letter:

CREATE VIEW outbox_dead_letters AS
SELECT *
FROM outbox_events
WHERE attempts >= 5
  AND processed_at IS NULL
ORDER BY created_at;

Uma boa política de retry para produção:

  • Defina maxAttempts para 5-10 dependendo de quão caras são as tentativas.
  • Considere backoff exponencial: inclua uma coluna retry_after e ignore linhas onde retry_after > NOW().
  • Alertar quando COUNT(*) FROM outbox_dead_letters exceder um limite.
  • Fornecer um caminho de retry manual: um endpoint de admin ou script que reseta attempts = 0 e retry_after = NULL para linhas específicas.

Mensagens venenosas – linhas que consistentemente falham devido a um bug no consumidor ou incompatibilidade de esquema – não devem bloquear mensagens saudáveis. Como o relay processa um lote por tick e marca falhas com um incremento de tentativa em vez de removê-los da fila, linhas saudáveis prosseguem normalmente enquanto as venenosas acumulam tentativas até atingirem o limite de dead-letter.

Ordenação de eventos e particionamento

A consulta de polling ordena por created_at, o que dá ordenação primeiro-entrar-praio-sair dentro de um lote. Para a maioria dos casos de uso isso é suficiente. Quando a ordenação estrita por entidade importa – por exemplo, garantindo que order.updated nunca seja publicado antes de order.created para o mesmo pedido – você precisa de ordenação por agregado.

Adicione aggregate_id à cláusula ORDER BY e use-o como a chave da mensagem ao publicar em um tópico particionado como Apache Kafka. Kafka roteia todas as mensagens com a mesma chave para a mesma partição, e partições são consumidas em ordem. Isso dá a você garantias de ordenação por agregado sem ordenação global, o que exigiria uma única instância de relay.

ORDER BY aggregate_id, created_at

Para brokers que não suportam ordenação particionada (como filas AMQP básicas), relay de instância única ou verificações de ordenação no nível da aplicação no consumidor são as alternativas práticas.

Reduzir latência de polling com LISTEN/NOTIFY

Um intervalo de polling de um segundo significa latência média de evento de 500 milissegundos. Para a maioria das cargas de trabalho isso é aceitável. Para casos onde você precisa de latência quase zero, o mecanismo LISTEN/NOTIFY do PostgreSQL permite que o relay acorde imediatamente quando uma nova linha outbox é inserida.

Adicione um trigger à tabela outbox:

CREATE OR REPLACE FUNCTION notify_outbox_insert() RETURNS trigger AS $$
BEGIN
    PERFORM pg_notify('outbox_event', NEW.id::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_insert_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox_insert();

No relay, ouça o canal e acorde nas notificações enquanto ainda recorre ao polling periódico:

func (r *OutboxRelay) Run(ctx context.Context) error {
    listener := pq.NewListener(r.dsn, 10*time.Second, time.Minute, nil)
    defer listener.Close()

    if err := listener.Listen("outbox_event"); err != nil {
        return fmt.Errorf("listen: %w", err)
    }

    ticker := time.NewTicker(5 * time.Second) // fallback poll
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-listener.Notify:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (notify)", "err", err)
            }
        case <-ticker.C:
            if err := r.processBatch(ctx); err != nil {
                r.logger.Error("outbox batch failed (poll)", "err", err)
            }
        }
    }
}

O ticker de fallback lida com quaisquer notificações perdidas durante uma reinicialização do relay ou uma oscilação de rede. Mantenha o intervalo de fallback em alguns segundos em vez de milissegundos – seu trabalho é recuperação, não baixa latência.

Observabilidade: métricas, logs e alertas

O outbox é infraestrutura. Trate-o como infraestrutura e instrumente-o adequadamente.

Métricas chave:

var (
    outboxPublished = prometheus.NewCounter(prometheus.CounterOpts{
        Name: "outbox_events_published_total",
        Help: "Total outbox events successfully published.",
    })
    outboxFailed = prometheus.NewCounterVec(prometheus.CounterOpts{
        Name: "outbox_events_failed_total",
        Help: "Total outbox publish failures by event type.",
    }, []string{"event_type"})
    outboxPending = prometheus.NewGauge(prometheus.GaugeOpts{
        Name: "outbox_events_pending",
        Help: "Current number of unprocessed outbox events.",
    })
    outboxBatchDuration = prometheus.NewHistogram(prometheus.HistogramOpts{
        Name:    "outbox_batch_duration_seconds",
        Help:    "Duration of each outbox processing batch.",
        Buckets: prometheus.DefBuckets,
    })
)

Atualização de Gauge: execute uma consulta periódica para manter outbox_events_pending preciso:

SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

Limites de alerta a considerar:

  • outbox_events_pending > 1000 por mais de dois minutos: o relay está ficando para trás ou travado.
  • outbox_events_pending crescendo monotonicamente: o broker está fora do ar ou o relay travou.
  • Contagem de dead-letter diferente de zero: bug de esquema ou consumidor precisa de investigação.
  • outbox_batch_duration_seconds p95 > 5s: o banco de dados está lento ou o tamanho do lote é muito grande.

Campos de log estruturados: inclua event_id, event_type, aggregate_id e attempt em cada linha de log do relay. Esses campos permitem que você correlacione uma publicação falha com a linha específica do outbox e o trace do consumidor downstream.

Outbox vs. fila direta vs. saga

O padrão outbox não é a ferramenta certa para todo problema de coordenação. Aqui está a comparação:

Abordagem Atomicidade Complexidade Quando usar
Publicação direta Nenhuma Baixa Aceitável perder eventos ocasionalmente
Caixa de saída transacional Forte Média Entrega confiável de eventos de um único serviço
Padrão Saga Eventual Alta Transações multi-serviço que abrangem múltiplos bancos de dados
Commit em duas fases Forte Muito alta Raramente prático; evitado na maioria dos sistemas distribuídos

O padrão outbox garante que um único serviço emita eventos que refletem suas próprias mudanças de estado de forma confiável. Ele não coordena mudanças de estado entre múltiplos serviços – isso é para o que o padrão Saga serve. A escolha do broker – seja RabbitMQ, SQS, ou Kafka – é independente do próprio padrão outbox; o relay publica para qualquer broker que seu sistema use.

Se você está construindo uma saga, o padrão outbox ainda é útil: cada participante na saga escreve sua mudança de estado local e seu evento de saga em uma única transação usando o outbox, e então o orquestrador ou coreografia da saga lê esses eventos de forma confiável.

CDC baseado em WAL como um relay alternativo

Em vez de polling, você pode seguir o Write-Ahead Log (WAL) do PostgreSQL e ler inserts do outbox diretamente do stream de replicação. Ferramentas como Debezium fazem isso. As vantagens são menor latência e nenhuma pressão de trava na tabela outbox. As desvantagens são complexidade operacional, um slot de replicação dedicado do PostgreSQL e um serviço externo para executar e monitorar.

Para a maioria das equipes, o relay de polling descrito acima é o ponto de partida certo. O tailing de WAL faz sentido quando você tem altas taxas de insert no outbox (dezenas de milhares por segundo), precisa de latência de evento sub-100ms, ou já está executando Debezium para outras necessidades de captura de mudanças.

Integração com sqlc

Se você usa sqlc para código de banco de dados Go com tipo seguro, as consultas do outbox se encaixam naturalmente:

-- name: InsertOutboxEvent :exec
INSERT INTO outbox_events (aggregate_type, aggregate_id, event_type, payload)
VALUES (@aggregate_type, @aggregate_id, @event_type, @payload);

-- name: FetchOutboxBatch :many
SELECT id, aggregate_type, aggregate_id, event_type, payload
FROM outbox_events
WHERE processed_at IS NULL
  AND attempts < @max_attempts
ORDER BY created_at
LIMIT @batch_size
FOR UPDATE SKIP LOCKED;

-- name: MarkOutboxProcessed :exec
UPDATE outbox_events SET processed_at = NOW() WHERE id = @id;

-- name: IncrementOutboxAttempts :exec
UPDATE outbox_events SET attempts = attempts + 1 WHERE id = @id;

-- name: OutboxPendingCount :one
SELECT COUNT(*) FROM outbox_events WHERE processed_at IS NULL;

O sqlc gera funções com tipo seguro para cada consulta, o que evita erros de interpolação de string e mantém a lógica de consulta do outbox co-localizada com o resto da sua camada de acesso ao banco de dados.

Checklist de produção

Use isto antes de entregar uma implementação outbox:

Banco de Dados

  • Tabela outbox tem o índice parcial em created_at WHERE processed_at IS NULL
  • Coluna attempts presente com um padrão de 0
  • Visão ou consulta de dead-letter definida
  • Linhas processadas antigas são arquivadas ou deletadas periodicamente (um trabalho de limpeza noturna é suficiente)

Relay

  • FOR UPDATE SKIP LOCKED usado na consulta de polling
  • Relay roda dentro de uma transação (iniciar antes da consulta, confirmar após todas as atualizações)
  • Tamanho do lote é limitado (50-200 linhas é típico)
  • Relay respeita cancelamento de contexto para encerramento gracioso
  • Publicações falhas incrementam attempts em vez de causar o aborto do lote

Idempotência

  • Mensagem publicada inclui o id do outbox como chave de deduplicação
  • Consumidores são idempotentes ou o broker fornece deduplicação
  • Veja Idempotência em Sistemas Distribuídos para padrões de deduplicação

Observabilidade

  • Gauge outbox_events_pending é monitorado e alertado
  • Contagem de dead-letter é alertada
  • Duração do lote do relay é rastreada
  • Logs estruturados incluem event_id, event_type e aggregate_id

Operações

  • Caminho de retry manual existe para linhas de dead-letter
  • Comportamento de reinicialização do relay é testado (ele republisha corretamente?)
  • Comportamento de falha do broker é testado (o outbox cresce e drena corretamente?)

Pensamentos finais

O problema da escrita dupla é fácil de descartar como um caso de borda até que cause um incidente. O padrão de caixa de saída transacional o resolve com ferramentas que você já tem: uma transação PostgreSQL, uma goroutine de fundo e uma tabela extra. O relay é simples de construir, simples de operar e simples de raciocinar.

O custo é que os consumidores devem ser projetados para entrega pelo-menos-uma-vez. Essa é uma compensação razoável. Entrega exatamente-uma-vez entre um banco de dados e um broker sem transações distribuídas não é alcançável na prática – e fingir o contrário leva a sistemas que silenciosamente descartam ou processam duplicadamente eventos sob condições de falha.

Escreva o evento com os dados. Retransmita-o de forma confiável. Torne os consumidores idempotentes. Este é o padrão inteiro.

Este artigo faz parte do cluster Arquitetura de Aplicação em Produção.

Fontes

Assinar

Receba novos artigos sobre sistemas, infraestrutura e engenharia de IA.