Модуль: Распределённые системы · Уровень: Senior+
TL;DR#
- Dual write — запись в БД и публикация в брокер выполняются как две независимые операции; между ними возможен сбой → одна сторона зафиксирована, другая нет → потеря или дублирование событий. Распределённой транзакции «БД + Kafka» в общем случае нет (а 2PC дорог и неотказоустойчив).
- Transactional outbox решает это: событие пишется в таблицу
outboxв той же локальной ACID-транзакции, что и бизнес-данные. Атомарность гарантируется БД. Отдельный процесс (relay) асинхронно вычитывает outbox и публикует в брокер. - Доставка из outbox — at-least-once (relay может упасть после публикации, но до фиксации факта отправки → повторная публикация). Дубликаты неизбежны → консьюмер обязан быть идемпотентным (inbox pattern / dedup-таблица).
- Два способа читать outbox: polling publisher (relay SELECT-ит таблицу) vs CDC (Debezium читает WAL/binlog транзакционного лога — нет нагрузки SELECT-ами, ниже latency).
- Ordering не бесплатен: глобального порядка нет, обычно гарантируют порядок в рамках партиционного ключа (aggregate_id).
Теория#
Проблема dual write#
Типичный сценарий: сервис заказов создаёт заказ и должен опубликовать событие OrderCreated.
// АНТИПАТТЕРН: dual write
func CreateOrder(ctx context.Context, o Order) error {
if err := db.Insert(ctx, o); err != nil { // (1) commit в БД
return err
}
if err := broker.Publish(ctx, OrderCreated{o.ID}); err != nil { // (2) публикация
return err // заказ уже в БД, но события НЕТ → inconsistency
}
return nil
}Четыре класса отказов между (1) и (2):
+-------------------+ +-------------------+
request | (1) DB commit OK | ---> | (2) Publish ... |
+-------------------+ +-------------------+
A. crash после (1), до (2) -> данные есть, события нет (LOST event)
B. publish OK, crash до ack БД -> событие есть, но retry создаст дубль
C. broker недоступен в (2) -> либо откатить заказ (нельзя, уже commit),
либо потерять событие
D. меняем порядок: publish->commit, и тогда rollback БД оставляет
"событие о заказе, которого нет" (PHANTOM event)Главное: нет единого атомарного коммита для двух разных систем хранения (БД и брокер). Любой порядок операций даёт окно несогласованности.
Решение: outbox таблица в той же транзакции#
Идея: брокер из критического пути убирается. Событие записывается в таблицу той же БД, в той же транзакции, что и изменение бизнес-данных. БД гарантирует, что либо зафиксировано и то, и другое, либо ничего.
+-------------------------------------------------------------+
| BEGIN |
| INSERT INTO orders (...) -- бизнес-данные |
| INSERT INTO outbox (...) -- событие |
| COMMIT (атомарно, ACID) |
+-------------------------------------------------------------+
|
v
+-----------------------------+
| Relay / CDC (асинхронно) |
| читает outbox -> публикует |
+-----------------------------+
|
v
+----------+
| Broker | (Kafka/Rabbit/NATS)
+----------+Схема таблицы#
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type TEXT NOT NULL, -- "order"
aggregate_id TEXT NOT NULL, -- ключ партиционирования / ordering
event_type TEXT NOT NULL, -- "OrderCreated"
payload JSONB NOT NULL, -- тело события
headers JSONB, -- trace_id, schema version, ...
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- поля для polling-publisher (для CDC не нужны):
published_at TIMESTAMPTZ, -- NULL = ещё не отправлено
attempts INT NOT NULL DEFAULT 0
);
-- индекс для polling: быстро находить неотправленные в порядке создания
CREATE INDEX idx_outbox_unpublished
ON outbox (created_at)
WHERE published_at IS NULL;Замечания:
idсобытия используется консьюмером как дедупликационный ключ (см. inbox).aggregate_idопределяет порядок и партицию в Kafka.- Для CDC поля
published_at/attemptsне нужны — Debezium отслеживает оффсет в логе сам.
Запись бизнес-операции + события атомарно (Go)#
func (r *Repo) CreateOrder(ctx context.Context, o Order) error {
return r.withTx(ctx, func(tx pgx.Tx) error {
if _, err := tx.Exec(ctx,
`INSERT INTO orders (id, customer_id, total, status)
VALUES ($1,$2,$3,$4)`,
o.ID, o.CustomerID, o.Total, o.Status); err != nil {
return err
}
evt := OrderCreated{OrderID: o.ID, Total: o.Total}
payload, _ := json.Marshal(evt)
_, err := tx.Exec(ctx,
`INSERT INTO outbox
(aggregate_type, aggregate_id, event_type, payload, headers)
VALUES ($1,$2,$3,$4,$5)`,
"order", o.ID.String(), "OrderCreated", payload,
map[string]string{"trace_id": traceID(ctx)})
return err
}) // COMMIT здесь: orders и outbox фиксируются вместе
}Relay / polling publisher#
Отдельный воркер периодически вычитывает неопубликованные строки и шлёт их в брокер.
func (p *Poller) tick(ctx context.Context) error {
return p.withTx(ctx, func(tx pgx.Tx) error {
// FOR UPDATE SKIP LOCKED -> несколько инстансов relay не возьмут
// одну и ту же строку (горизонтальное масштабирование).
rows, err := tx.Query(ctx, `
SELECT id, aggregate_id, event_type, payload, headers
FROM outbox
WHERE published_at IS NULL
ORDER BY created_at -- порядок!
LIMIT 100
FOR UPDATE SKIP LOCKED`)
if err != nil {
return err
}
batch, err := scanOutbox(rows)
if err != nil {
return err
}
for _, e := range batch {
// ключ = aggregate_id -> порядок в рамках агрегата в Kafka
if err := p.broker.Publish(ctx, e.AggregateID, e.toMessage()); err != nil {
return err // транзакция откатится, published_at не выставится -> retry
}
if _, err := tx.Exec(ctx,
`UPDATE outbox SET published_at = now(), attempts = attempts + 1
WHERE id = $1`, e.ID); err != nil {
return err
}
}
return nil
})
}Почему at-least-once: между broker.Publish() и COMMIT (фиксацией published_at) есть окно. Если relay упадёт после успешной публикации, но до коммита, на следующем тике строка снова считается неопубликованной → событие уйдёт второй раз. Это сознательный выбор: лучше дубль, чем потеря.
Тонкость с SKIP LOCKED и ordering: при параллельных воркерах строки могут публиковаться не строго по created_at. Если нужен порядок в рамках агрегата — либо один воркер на партицию, либо шардировать FOR UPDATE по hash(aggregate_id).
Очистка: опубликованные строки удаляют отдельным процессом (DELETE WHERE published_at < now() - interval '7 days') или партиционируют таблицу по дате и дропают старые партиции.
CDC (Change Data Capture) — Debezium#
Альтернатива polling: вместо SELECT-ов читать транзакционный лог БД (Postgres WAL через logical replication / MySQL binlog). Debezium подключается как replication slot, видит каждый INSERT в outbox и публикует в Kafka через Kafka Connect.
App --tx--> Postgres ===WAL===> Debezium ---> Kafka topic
^ (читает лог,
| не нагружает
INSERT в outbox таблицу SELECT-ами)| Аспект | Polling publisher | CDC (Debezium) |
|---|---|---|
| Нагрузка на БД | SELECT каждые N мс | чтение лога, почти бесплатно |
| Latency | зависит от интервала poll | near-real-time |
| Инфраструктура | только код приложения | Kafka Connect + Debezium + конфиг |
| Сложность ops | низкая | высокая (slot lag, рестарты коннектора) |
| Порядок | как написал relay | порядок WAL (по транзакциям) |
| Где упадёт | приложение | оператор/коннектор |
Outbox + CDC иногда называют Outbox Event Router (Debezium SMT перекладывает строку outbox в правильный топик по aggregate_type, ключует по aggregate_id).
Важно про CDC и Postgres: незакоммиченные транзакции в WAL не видны логической репликации до COMMIT — поэтому Debezium не опубликует «фантом» от откаченной транзакции. Также replication slot держит WAL, пока консьюмер не продвинул оффсет — лагающий Debezium может раздуть диск под WAL (классический инцидент).
At-least-once семантика outbox#
Outbox даёт гарантию: каждое зафиксированное событие будет опубликовано минимум один раз. Не «ровно один раз». Источники дублей:
- relay упал между publish и commit
published_at; - CDC коннектор перечитал кусок лога после рестарта (at-least-once у Kafka Connect);
- ретраи самого продюсера в брокер.
Поэтому ответственность за «эффект ровно один раз» (effectively-once) переносится на консьюмера через идемпотентность.
Inbox pattern (дедупликация на стороне консьюмера)#
Зеркальная идея: консьюмер ведёт таблицу обработанных сообщений и в той же транзакции, что и применение эффекта, отмечает message_id как обработанный. Повторная доставка отбрасывается по PK-конфликту.
CREATE TABLE inbox (
message_id UUID PRIMARY KEY, -- = outbox.id, дедуп-ключ
consumer TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);func (c *Consumer) Handle(ctx context.Context, msg Message) error {
return c.withTx(ctx, func(tx pgx.Tx) error {
// claim сообщения; конфликт = уже обработано -> idempotent skip
ct, err := tx.Exec(ctx,
`INSERT INTO inbox (message_id, consumer) VALUES ($1, $2)
ON CONFLICT (message_id) DO NOTHING`,
msg.ID, c.name)
if err != nil {
return err
}
if ct.RowsAffected() == 0 {
return nil // дубликат — ничего не делаем
}
// бизнес-эффект в ТОЙ ЖЕ транзакции, что и отметка inbox
return c.applyEffect(ctx, tx, msg)
})
}Ключевое: запись в inbox и применение эффекта атомарны. Если бы мы сначала применяли эффект, потом отдельно писали inbox — снова dual write на стороне консьюмера. Поэтому inbox-таблица должна жить в той же БД, что и эффект (иначе нужна снова идемпотентность по бизнес-ключу / upsert).
Альтернатива inbox-таблице — естественная идемпотентность: операция сама по себе идемпотентна (SET status='paid', upsert по бизнес-ключу), тогда отдельный dedup не нужен.
Ordering#
- Глобального порядка нет и обычно не нужен. Гарантируют порядок в рамках ключа партиционирования (
aggregate_id) — все события одного заказа идут по одной партиции Kafka в порядке записи. - Polling с одним воркером +
ORDER BY created_atсохраняет порядок; с параллельными воркерами иSKIP LOCKED— нет (нужно шардировать по ключу). - Каверзный момент с auto-increment
idи порядком: при конкурентных транзакциях бóльшийidможет закоммититься раньше меньшего → polling поid > last_seenможет пропустить строку, которая закоммитится позже. Поэтому для polling безопаснее ориентироваться наpublished_at IS NULL(а не на «курсор по id»), либо использовать монотонный по коммиту маркер. CDC порядок берёт из WAL по факту коммита — этой проблемы нет.
Подводные камни / gotchas#
- «Outbox = exactly-once» — нет. Outbox = at-least-once публикация + перенос exactly-once-эффекта на идемпотентного консьюмера.
- Курсор по auto-increment id в polling пропускает события из-за переупорядочивания коммитов конкурентных транзакций. Используйте флаг
published_at/SKIP LOCKED, не «WHERE id > cursor». - Несколько relay без
SKIP LOCKED→ одна строка публикуется несколькими воркерами (лишние дубли) или дедлоки. - Рост таблицы outbox: без очистки/партиционирования таблица и индекс распухают, polling замедляется. Чистите опубликованные строки.
- CDC и replication slot: лагающий/упавший Debezium держит WAL → переполнение диска у Postgres. Мониторьте
pg_replication_slots.confirmed_flush_lsnи slot lag. - Транзакция держит publish внутри себя (как в наивном relay): сетевой вызов в брокер внутри открытой БД-транзакции удлиняет её и держит locks. Лучше: прочитать батч, закоммитить «claim», публиковать вне транзакции, отдельно отметить — но это снова окно at-least-once (ожидаемо).
- Размер payload: большие JSONB в outbox раздувают WAL и реплику. Иногда кладут только ссылку/ключ, тело — отдельно (event-carried vs notification).
- Порядок при ретраях: если событие N зафейлилось, а N+1 ушло — порядок нарушен. Для строгого порядка в рамках агрегата при ошибке нужно блокировать продвижение по этому
aggregate_id(head-of-line blocking). - Inbox в другой БД, чем эффект не даёт атомарности — это псевдо-решение. Либо общая БД, либо идемпотентность по бизнес-ключу.
Вопросы на собеседовании#
В: Что такое проблема dual write и почему её нельзя решить просто «сначала БД, потом брокер»? О: Это попытка атомарно изменить две разные системы хранения без распределённой транзакции. Любой порядок даёт окно сбоя: commit→publish теряет событие при падении между ними; publish→commit создаёт фантом при откате БД. Ретраи дают дубли. Нет единого атомарного коммита → нужен outbox (свести к одной локальной транзакции) либо 2PC (дорого и неотказоустойчиво).
В: Почему outbox даёт только at-least-once, а не exactly-once?
О: Между публикацией события из outbox в брокер и фиксацией факта публикации (published_at/оффсет) есть окно. Падение в этом окне приводит к повторной публикации. Плюс ретраи продюсера и перечитывание лога CDC после рестарта. Exactly-once delivery недостижим; добиваемся effectively-once = at-least-once + идемпотентный консьюмер.
В: Polling publisher vs CDC — когда что? О: Polling прост (только код, никакой доп. инфры), но нагружает БД SELECT-ами и имеет latency интервала опроса. CDC (Debezium читает WAL/binlog) — near-real-time, не нагружает таблицу, но требует Kafka Connect, операционно сложнее, есть риск раздувания WAL через replication slot. Стартапу/моносервису — polling; высоконагруженной платформе с Kafka — CDC.
В: Как обеспечить идемпотентность консьюмера?
О: Inbox pattern: в одной транзакции INSERT message_id ON CONFLICT DO NOTHING + применение эффекта; при конфликте — skip. Либо естественная идемпотентность операции (upsert/SET по бизнес-ключу). Главное — отметка обработки и эффект должны быть атомарны, иначе на консьюмере снова dual write.
В: Почему опасно вести polling по WHERE id > last_id с auto-increment id?
О: Порядок присвоения id не равен порядку коммита. Транзакция с бóльшим id может закоммититься раньше, чем с меньшим. Курсор перепрыгнет «меньшую» строку, которая закоммитится позже, и она будет потеряна. Нужно полить по флагу published_at IS NULL + SKIP LOCKED, а не по числовому курсору; либо использовать монотонный по факту коммита маркер (LSN).
В: Как масштабировать relay горизонтально, не порождая дубли и дедлоки?
О: SELECT ... FOR UPDATE SKIP LOCKED LIMIT N — каждый воркер берёт свой непересекающийся батч. Для сохранения порядка в рамках агрегата — шардировать выборку по hash(aggregate_id) % N так, чтобы один aggregate_id всегда обслуживал один воркер.
В: Как outbox взаимодействует с ordering в Kafka?
О: Глобального порядка нет; гарантируем порядок в рамках ключа = aggregate_id (одна партиция). Один воркер на партицию или шардирование по ключу. При ошибке публикации события надо блокировать продвижение по этому ключу (head-of-line), иначе порядок нарушится.
В: Чем CDC-outbox защищён от «фантомных» событий от откаченных транзакций? О: Логическая репликация Postgres отдаёт изменения только после COMMIT транзакции; INSERT в outbox внутри откаченной транзакции в логический поток не попадёт. То есть атомарность «бизнес-данные + outbox» сохраняется и на уровне того, что CDC увидит.
На что копают на senior+#
- Понимание, что outbox — это сведение распределённой атомарности к локальной ACID-транзакции, а не магия «гарантированной доставки».
- Чёткое разделение delivery (at-least-once) vs effect (effectively-once) и куда переносится ответственность за идемпотентность.
- Тонкости ordering: переупорядочивание коммитов, курсор по id, head-of-line blocking, партиционный ключ.
- Операционные риски CDC: replication slot lag → WAL bloat; рестарты коннектора → повторная доставка; схема эволюции payload.
- Trade-off «publish внутри транзакции» (короткое окно дублей, но длинные locks) vs «publish вне транзакции» (масштабируемо).
- Жизненный цикл данных outbox: очистка, партиционирование, размер payload (event-carried state transfer vs notification event), влияние на WAL/реплики.
- Связь с saga/process manager: outbox — транспорт для шагов саги; идемпотентность шагов и компенсаций обязательна.