Модуль: Распределённые системы · Уровень: Senior+

TL;DR#

  • Dual write — запись в БД и публикация в брокер выполняются как две независимые операции; между ними возможен сбой → одна сторона зафиксирована, другая нет → потеря или дублирование событий. Распределённой транзакции «БД + Kafka» в общем случае нет (а 2PC дорог и неотказоустойчив).
  • Transactional outbox решает это: событие пишется в таблицу outbox в той же локальной ACID-транзакции, что и бизнес-данные. Атомарность гарантируется БД. Отдельный процесс (relay) асинхронно вычитывает outbox и публикует в брокер.
  • Доставка из outbox — at-least-once (relay может упасть после публикации, но до фиксации факта отправки → повторная публикация). Дубликаты неизбежны → консьюмер обязан быть идемпотентным (inbox pattern / dedup-таблица).
  • Два способа читать outbox: polling publisher (relay SELECT-ит таблицу) vs CDC (Debezium читает WAL/binlog транзакционного лога — нет нагрузки SELECT-ами, ниже latency).
  • Ordering не бесплатен: глобального порядка нет, обычно гарантируют порядок в рамках партиционного ключа (aggregate_id).

Теория#

Проблема dual write#

Типичный сценарий: сервис заказов создаёт заказ и должен опубликовать событие OrderCreated.

// АНТИПАТТЕРН: dual write
func CreateOrder(ctx context.Context, o Order) error {
    if err := db.Insert(ctx, o); err != nil {     // (1) commit в БД
        return err
    }
    if err := broker.Publish(ctx, OrderCreated{o.ID}); err != nil { // (2) публикация
        return err // заказ уже в БД, но события НЕТ → inconsistency
    }
    return nil
}

Четыре класса отказов между (1) и (2):

            +-------------------+        +-------------------+
   request  |  (1) DB commit OK |  --->  | (2) Publish ...   |
            +-------------------+        +-------------------+

  A. crash после (1), до (2)      -> данные есть, события нет (LOST event)
  B. publish OK, crash до ack БД   -> событие есть, но retry создаст дубль
  C. broker недоступен в (2)       -> либо откатить заказ (нельзя, уже commit),
                                       либо потерять событие
  D. меняем порядок: publish->commit, и тогда rollback БД оставляет
     "событие о заказе, которого нет" (PHANTOM event)

Главное: нет единого атомарного коммита для двух разных систем хранения (БД и брокер). Любой порядок операций даёт окно несогласованности.

Решение: outbox таблица в той же транзакции#

Идея: брокер из критического пути убирается. Событие записывается в таблицу той же БД, в той же транзакции, что и изменение бизнес-данных. БД гарантирует, что либо зафиксировано и то, и другое, либо ничего.

   +-------------------------------------------------------------+
   |  BEGIN                                                       |
   |     INSERT INTO orders (...)            -- бизнес-данные     |
   |     INSERT INTO outbox  (...)           -- событие          |
   |  COMMIT                                  (атомарно, ACID)    |
   +-------------------------------------------------------------+
                              |
                              v
              +-----------------------------+
              |  Relay / CDC  (асинхронно)  |
              |  читает outbox -> публикует |
              +-----------------------------+
                              |
                              v
                        +----------+
                        |  Broker  |  (Kafka/Rabbit/NATS)
                        +----------+

Схема таблицы#

CREATE TABLE outbox (
    id             UUID         PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type TEXT         NOT NULL,          -- "order"
    aggregate_id   TEXT         NOT NULL,          -- ключ партиционирования / ordering
    event_type     TEXT         NOT NULL,          -- "OrderCreated"
    payload        JSONB        NOT NULL,          -- тело события
    headers        JSONB,                          -- trace_id, schema version, ...
    created_at     TIMESTAMPTZ  NOT NULL DEFAULT now(),
    -- поля для polling-publisher (для CDC не нужны):
    published_at   TIMESTAMPTZ,                    -- NULL = ещё не отправлено
    attempts       INT          NOT NULL DEFAULT 0
);

-- индекс для polling: быстро находить неотправленные в порядке создания
CREATE INDEX idx_outbox_unpublished
    ON outbox (created_at)
    WHERE published_at IS NULL;

Замечания:

  • id события используется консьюмером как дедупликационный ключ (см. inbox).
  • aggregate_id определяет порядок и партицию в Kafka.
  • Для CDC поля published_at/attempts не нужны — Debezium отслеживает оффсет в логе сам.

Запись бизнес-операции + события атомарно (Go)#

func (r *Repo) CreateOrder(ctx context.Context, o Order) error {
    return r.withTx(ctx, func(tx pgx.Tx) error {
        if _, err := tx.Exec(ctx,
            `INSERT INTO orders (id, customer_id, total, status)
             VALUES ($1,$2,$3,$4)`,
            o.ID, o.CustomerID, o.Total, o.Status); err != nil {
            return err
        }

        evt := OrderCreated{OrderID: o.ID, Total: o.Total}
        payload, _ := json.Marshal(evt)

        _, err := tx.Exec(ctx,
            `INSERT INTO outbox
               (aggregate_type, aggregate_id, event_type, payload, headers)
             VALUES ($1,$2,$3,$4,$5)`,
            "order", o.ID.String(), "OrderCreated", payload,
            map[string]string{"trace_id": traceID(ctx)})
        return err
    }) // COMMIT здесь: orders и outbox фиксируются вместе
}

Relay / polling publisher#

Отдельный воркер периодически вычитывает неопубликованные строки и шлёт их в брокер.

func (p *Poller) tick(ctx context.Context) error {
    return p.withTx(ctx, func(tx pgx.Tx) error {
        // FOR UPDATE SKIP LOCKED -> несколько инстансов relay не возьмут
        // одну и ту же строку (горизонтальное масштабирование).
        rows, err := tx.Query(ctx, `
            SELECT id, aggregate_id, event_type, payload, headers
            FROM outbox
            WHERE published_at IS NULL
            ORDER BY created_at          -- порядок!
            LIMIT 100
            FOR UPDATE SKIP LOCKED`)
        if err != nil {
            return err
        }
        batch, err := scanOutbox(rows)
        if err != nil {
            return err
        }

        for _, e := range batch {
            // ключ = aggregate_id -> порядок в рамках агрегата в Kafka
            if err := p.broker.Publish(ctx, e.AggregateID, e.toMessage()); err != nil {
                return err // транзакция откатится, published_at не выставится -> retry
            }
            if _, err := tx.Exec(ctx,
                `UPDATE outbox SET published_at = now(), attempts = attempts + 1
                 WHERE id = $1`, e.ID); err != nil {
                return err
            }
        }
        return nil
    })
}

Почему at-least-once: между broker.Publish() и COMMIT (фиксацией published_at) есть окно. Если relay упадёт после успешной публикации, но до коммита, на следующем тике строка снова считается неопубликованной → событие уйдёт второй раз. Это сознательный выбор: лучше дубль, чем потеря.

Тонкость с SKIP LOCKED и ordering: при параллельных воркерах строки могут публиковаться не строго по created_at. Если нужен порядок в рамках агрегата — либо один воркер на партицию, либо шардировать FOR UPDATE по hash(aggregate_id).

Очистка: опубликованные строки удаляют отдельным процессом (DELETE WHERE published_at < now() - interval '7 days') или партиционируют таблицу по дате и дропают старые партиции.

CDC (Change Data Capture) — Debezium#

Альтернатива polling: вместо SELECT-ов читать транзакционный лог БД (Postgres WAL через logical replication / MySQL binlog). Debezium подключается как replication slot, видит каждый INSERT в outbox и публикует в Kafka через Kafka Connect.

   App --tx--> Postgres ===WAL===> Debezium ---> Kafka topic
                  ^                  (читает лог,
                  |                   не нагружает
            INSERT в outbox          таблицу SELECT-ами)
АспектPolling publisherCDC (Debezium)
Нагрузка на БДSELECT каждые N мсчтение лога, почти бесплатно
Latencyзависит от интервала pollnear-real-time
Инфраструктуратолько код приложенияKafka Connect + Debezium + конфиг
Сложность opsнизкаявысокая (slot lag, рестарты коннектора)
Порядоккак написал relayпорядок WAL (по транзакциям)
Где упадётприложениеоператор/коннектор

Outbox + CDC иногда называют Outbox Event Router (Debezium SMT перекладывает строку outbox в правильный топик по aggregate_type, ключует по aggregate_id).

Важно про CDC и Postgres: незакоммиченные транзакции в WAL не видны логической репликации до COMMIT — поэтому Debezium не опубликует «фантом» от откаченной транзакции. Также replication slot держит WAL, пока консьюмер не продвинул оффсет — лагающий Debezium может раздуть диск под WAL (классический инцидент).

At-least-once семантика outbox#

Outbox даёт гарантию: каждое зафиксированное событие будет опубликовано минимум один раз. Не «ровно один раз». Источники дублей:

  • relay упал между publish и commit published_at;
  • CDC коннектор перечитал кусок лога после рестарта (at-least-once у Kafka Connect);
  • ретраи самого продюсера в брокер.

Поэтому ответственность за «эффект ровно один раз» (effectively-once) переносится на консьюмера через идемпотентность.

Inbox pattern (дедупликация на стороне консьюмера)#

Зеркальная идея: консьюмер ведёт таблицу обработанных сообщений и в той же транзакции, что и применение эффекта, отмечает message_id как обработанный. Повторная доставка отбрасывается по PK-конфликту.

CREATE TABLE inbox (
    message_id   UUID        PRIMARY KEY,   -- = outbox.id, дедуп-ключ
    consumer     TEXT        NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
func (c *Consumer) Handle(ctx context.Context, msg Message) error {
    return c.withTx(ctx, func(tx pgx.Tx) error {
        // claim сообщения; конфликт = уже обработано -> idempotent skip
        ct, err := tx.Exec(ctx,
            `INSERT INTO inbox (message_id, consumer) VALUES ($1, $2)
             ON CONFLICT (message_id) DO NOTHING`,
            msg.ID, c.name)
        if err != nil {
            return err
        }
        if ct.RowsAffected() == 0 {
            return nil // дубликат — ничего не делаем
        }

        // бизнес-эффект в ТОЙ ЖЕ транзакции, что и отметка inbox
        return c.applyEffect(ctx, tx, msg)
    })
}

Ключевое: запись в inbox и применение эффекта атомарны. Если бы мы сначала применяли эффект, потом отдельно писали inbox — снова dual write на стороне консьюмера. Поэтому inbox-таблица должна жить в той же БД, что и эффект (иначе нужна снова идемпотентность по бизнес-ключу / upsert).

Альтернатива inbox-таблице — естественная идемпотентность: операция сама по себе идемпотентна (SET status='paid', upsert по бизнес-ключу), тогда отдельный dedup не нужен.

Ordering#

  • Глобального порядка нет и обычно не нужен. Гарантируют порядок в рамках ключа партиционирования (aggregate_id) — все события одного заказа идут по одной партиции Kafka в порядке записи.
  • Polling с одним воркером + ORDER BY created_at сохраняет порядок; с параллельными воркерами и SKIP LOCKED — нет (нужно шардировать по ключу).
  • Каверзный момент с auto-increment id и порядком: при конкурентных транзакциях бóльший id может закоммититься раньше меньшего → polling по id > last_seen может пропустить строку, которая закоммитится позже. Поэтому для polling безопаснее ориентироваться на published_at IS NULL (а не на «курсор по id»), либо использовать монотонный по коммиту маркер. CDC порядок берёт из WAL по факту коммита — этой проблемы нет.

Подводные камни / gotchas#

  • «Outbox = exactly-once» — нет. Outbox = at-least-once публикация + перенос exactly-once-эффекта на идемпотентного консьюмера.
  • Курсор по auto-increment id в polling пропускает события из-за переупорядочивания коммитов конкурентных транзакций. Используйте флаг published_at/SKIP LOCKED, не «WHERE id > cursor».
  • Несколько relay без SKIP LOCKED → одна строка публикуется несколькими воркерами (лишние дубли) или дедлоки.
  • Рост таблицы outbox: без очистки/партиционирования таблица и индекс распухают, polling замедляется. Чистите опубликованные строки.
  • CDC и replication slot: лагающий/упавший Debezium держит WAL → переполнение диска у Postgres. Мониторьте pg_replication_slots.confirmed_flush_lsn и slot lag.
  • Транзакция держит publish внутри себя (как в наивном relay): сетевой вызов в брокер внутри открытой БД-транзакции удлиняет её и держит locks. Лучше: прочитать батч, закоммитить «claim», публиковать вне транзакции, отдельно отметить — но это снова окно at-least-once (ожидаемо).
  • Размер payload: большие JSONB в outbox раздувают WAL и реплику. Иногда кладут только ссылку/ключ, тело — отдельно (event-carried vs notification).
  • Порядок при ретраях: если событие N зафейлилось, а N+1 ушло — порядок нарушен. Для строгого порядка в рамках агрегата при ошибке нужно блокировать продвижение по этому aggregate_id (head-of-line blocking).
  • Inbox в другой БД, чем эффект не даёт атомарности — это псевдо-решение. Либо общая БД, либо идемпотентность по бизнес-ключу.

Вопросы на собеседовании#

В: Что такое проблема dual write и почему её нельзя решить просто «сначала БД, потом брокер»? О: Это попытка атомарно изменить две разные системы хранения без распределённой транзакции. Любой порядок даёт окно сбоя: commit→publish теряет событие при падении между ними; publish→commit создаёт фантом при откате БД. Ретраи дают дубли. Нет единого атомарного коммита → нужен outbox (свести к одной локальной транзакции) либо 2PC (дорого и неотказоустойчиво).

В: Почему outbox даёт только at-least-once, а не exactly-once? О: Между публикацией события из outbox в брокер и фиксацией факта публикации (published_at/оффсет) есть окно. Падение в этом окне приводит к повторной публикации. Плюс ретраи продюсера и перечитывание лога CDC после рестарта. Exactly-once delivery недостижим; добиваемся effectively-once = at-least-once + идемпотентный консьюмер.

В: Polling publisher vs CDC — когда что? О: Polling прост (только код, никакой доп. инфры), но нагружает БД SELECT-ами и имеет latency интервала опроса. CDC (Debezium читает WAL/binlog) — near-real-time, не нагружает таблицу, но требует Kafka Connect, операционно сложнее, есть риск раздувания WAL через replication slot. Стартапу/моносервису — polling; высоконагруженной платформе с Kafka — CDC.

В: Как обеспечить идемпотентность консьюмера? О: Inbox pattern: в одной транзакции INSERT message_id ON CONFLICT DO NOTHING + применение эффекта; при конфликте — skip. Либо естественная идемпотентность операции (upsert/SET по бизнес-ключу). Главное — отметка обработки и эффект должны быть атомарны, иначе на консьюмере снова dual write.

В: Почему опасно вести polling по WHERE id > last_id с auto-increment id? О: Порядок присвоения id не равен порядку коммита. Транзакция с бóльшим id может закоммититься раньше, чем с меньшим. Курсор перепрыгнет «меньшую» строку, которая закоммитится позже, и она будет потеряна. Нужно полить по флагу published_at IS NULL + SKIP LOCKED, а не по числовому курсору; либо использовать монотонный по факту коммита маркер (LSN).

В: Как масштабировать relay горизонтально, не порождая дубли и дедлоки? О: SELECT ... FOR UPDATE SKIP LOCKED LIMIT N — каждый воркер берёт свой непересекающийся батч. Для сохранения порядка в рамках агрегата — шардировать выборку по hash(aggregate_id) % N так, чтобы один aggregate_id всегда обслуживал один воркер.

В: Как outbox взаимодействует с ordering в Kafka? О: Глобального порядка нет; гарантируем порядок в рамках ключа = aggregate_id (одна партиция). Один воркер на партицию или шардирование по ключу. При ошибке публикации события надо блокировать продвижение по этому ключу (head-of-line), иначе порядок нарушится.

В: Чем CDC-outbox защищён от «фантомных» событий от откаченных транзакций? О: Логическая репликация Postgres отдаёт изменения только после COMMIT транзакции; INSERT в outbox внутри откаченной транзакции в логический поток не попадёт. То есть атомарность «бизнес-данные + outbox» сохраняется и на уровне того, что CDC увидит.

На что копают на senior+#

  • Понимание, что outbox — это сведение распределённой атомарности к локальной ACID-транзакции, а не магия «гарантированной доставки».
  • Чёткое разделение delivery (at-least-once) vs effect (effectively-once) и куда переносится ответственность за идемпотентность.
  • Тонкости ordering: переупорядочивание коммитов, курсор по id, head-of-line blocking, партиционный ключ.
  • Операционные риски CDC: replication slot lag → WAL bloat; рестарты коннектора → повторная доставка; схема эволюции payload.
  • Trade-off «publish внутри транзакции» (короткое окно дублей, но длинные locks) vs «publish вне транзакции» (масштабируемо).
  • Жизненный цикл данных outbox: очистка, партиционирование, размер payload (event-carried state transfer vs notification event), влияние на WAL/реплики.
  • Связь с saga/process manager: outbox — транспорт для шагов саги; идемпотентность шагов и компенсаций обязательна.