Модуль: Распределённые системы · Уровень: Senior+
TL;DR#
- Kafka — это распределённый, реплицируемый, persistent commit log, а не классическая очередь. Сообщения не удаляются после чтения; консьюмеры читают по своему оффсету (pull-модель).
- Топик → партиции. Партиция — единица параллелизма и единственная единица упорядочивания. Глобального порядка между партициями нет.
- Consumer group: одна партиция в конкретный момент обрабатывается ровно одним консьюмером группы. Параллелизм группы ограничен числом партиций.
- Гарантии порядка — только внутри партиции; ключ сообщения определяет партицию (
hash(key) % partitions), поэтому одинаковый ключ → одна партиция → порядок. - Delivery: по умолчанию at-least-once. Exactly-once (EOS) достигается idempotent producer + transactions +
read_committed. At-most-once — если коммитить оффсет до обработки. - Надёжность:
acks=all+min.insync.replicas>=2+replication.factor>=3. ISR — синхронные реплики. - Ребалансировка: eager (stop-the-world) vs cooperative/incremental (KIP-429). Ретеншн: по времени/размеру или log compaction (хранит последнее значение на ключ).
- Go-клиенты: franz-go (предпочтительно, нативный, поддержка свежих KIP, EOS) vs sarama (зрелый, но тяжелее и с историей багов).
Теория#
Лог как структура данных#
Партиция — это append-only лог сегментов на диске. Каждое сообщение получает монотонный offset (int64) — порядковый номер в партиции. Это центральная идея Kafka: брокер не отслеживает, кто что прочитал, он лишь хранит лог и текущие границы.
Партиция P0 (топик "orders"):
offset: 0 1 2 3 4 5 6 ... (log-end-offset)
[m0] [m1] [m2] [m3] [m4] [m5] [m6]
^ ^
consumer A producer пишет
committed=3 в конец (append)Сегменты: лог нарезается на segment-файлы (*.log + индексы *.index по offset, *.timeindex по времени). Ретеншн и compaction работают на уровне сегментов — удалять/чистить можно целыми файлами, что дёшево. Активный сегмент не трогается до ротации.
Запись последовательная (sequential I/O) + zero-copy (sendfile) при отдаче — отсюда высокий throughput: данные идут page cache → socket, минуя userspace.
Топики, партиции, оффсеты#
- Топик — логическое имя потока. Делится на N партиций (число задаётся при создании, увеличивать можно, уменьшать — нет).
- Партиция — упорядоченная неизменяемая последовательность. Единица: репликации, параллелизма, упорядочивания.
- Offset — позиция консьюмера. Хранится не у брокера-per-message, а коммитится консьюмером в служебный топик
__consumer_offsets(или вручную во внешнее хранилище).
Ключевые оффсеты партиции: log-start-offset (после ретеншна), high-watermark (HW — максимальный реплицированный во все ISR оффсет; консьюмер в read_committed/обычном режиме не видит дальше HW), log-end-offset (LEO — конец лога лидера).
Распределение по партициям и роль ключа#
Producer выбирает партицию:
- Есть ключ →
partition = murmur2(key) % numPartitions(детерминированно). Гарантия: все сообщения с одним ключом попадают в одну партицию → сохраняют относительный порядок. - Нет ключа → sticky partitioner (батчит в одну партицию пока батч не закроется), затем меняет — для равномерности и эффективности батчинга.
Senior-нюанс: число партиций менять опасно для семантики ключа — % numPartitions поменяется, и существующие ключи “переедут” в другие партиции, ломая порядок и locality. Поэтому партиционирование планируют заранее с запасом.
Consumer groups#
Группа определяется group.id. Брокер-координатор раздаёт партиции топика членам группы так, что каждая партиция назначена ровно одному консьюмеру в группе.
Топик orders: 4 партиции. Группа "billing" = 3 консьюмера.
P0 -> C1
P1 -> C1
P2 -> C2
P3 -> C3
C1 держит 2 партиции (партиций больше, чем консьюмеров)
Если консьюмеров 5 при 4 партициях -> один консьюмер простаивает.Следствия:
- Максимальный параллелизм группы = число партиций. Хотите больше воркеров — добавляйте партиции.
- Разные группы читают независимо, каждая со своим набором оффсетов (fan-out): один топик читают и
billing, иanalytics.
Гарантии упорядочивания#
- Внутри партиции — строгий порядок (по offset).
- Между партициями — порядка НЕТ. “Глобального порядка” в Kafka не существует без single-partition (что убивает параллелизм).
- Практика: выбирайте ключ так, чтобы события, требующие порядка, попадали в одну партицию (например
order_id). Тогда порядок per-order сохранён, а параллелизм есть между разными order_id.
Гарантия порядка от producer’а ломается при ретраях, если max.in.flight.requests.per.connection > 1 без идемпотентности: ретрай батча может переупорядочить запись. С idempotent producer (enable.idempotence=true) порядок при ретраях сохраняется до in-flight=5.
Delivery semantics#
- At-most-once: коммит оффсета ДО обработки. Падение после коммита → потеря сообщения. Редко нужно.
- At-least-once (по умолчанию): обработать → потом коммитить оффсет. Падение после обработки до коммита → повторная доставка. Требует идемпотентных консьюмеров (дедуп по бизнес-ключу, upsert).
- Exactly-once (EOS): достигается комбинацией:
- Idempotent producer (
enable.idempotence=true): PID + sequence number на партицию, брокер отбрасывает дубликаты ретраев. Защищает producer→broker. - Transactions (
transactional.id): атомарно пишем в несколько партиций/топиков И коммитим consumer-оффсеты в одной транзакции (паттерн consume-process-produce). Коммит оффсета черезsendOffsetsToTransaction. - Консьюмеры читают с
isolation.level=read_committed— видят только закоммиченные транзакции (фильтруют aborted). - EOS работает строго в рамках Kafka (Kafka→Kafka, например Kafka Streams). При записи во внешнюю БД нужна транзакция/идемпотентность на стороне приёмника (outbox-паттерн).
- Idempotent producer (
acks, ISR, репликация#
Каждая партиция: один leader + N-1 followers. Producer и consumer работают только с лидером. Followers пуллят данные с лидера.
ISR (In-Sync Replicas) — набор реплик, не отстающих от лидера больше replica.lag.time.max.ms. HW продвигается только когда запись реплицирована во все ISR.
acks:
acks=0— fire-and-forget, возможна потеря.acks=1— лидер записал (в свой лог/page cache). Потеря, если лидер упал до репликации.acks=all(=-1) — все ISR подтвердили. Надёжно ТОЛЬКО в паре сmin.insync.replicas.
Ключевая ловушка: acks=all + replication.factor=3 + min.insync.replicas=1 — если ISR схлопнулся до лидера, “all” = 1 реплика, потеря при падении лидера остаётся. Правильно: replication.factor=3, min.insync.replicas=2, acks=all — переживает падение одной реплики и не теряет данные. Если ISR < min.insync — producer получает NotEnoughReplicas (отказ записи, а не тихая потеря).
Ребалансировка#
Когда состав группы или партиций меняется (consumer присоединился/упал/max.poll.interval.ms истёк), координатор запускает ребаланс.
- Eager (старый, RangeAssignor/RoundRobin): stop-the-world — ВСЕ консьюмеры отдают ВСЕ партиции, потом получают новое назначение. Простоя на время ребаланса = плохо при больших группах.
- Cooperative / incremental (KIP-429, CooperativeStickyAssignor): за два рывка отдаются только те партиции, что реально переезжают; остальные продолжают обрабатываться. Резко снижает downtime. Сейчас рекомендуемый.
Дополнительно static membership (KIP-345, group.instance.id) — при кратковременном рестарте пода (k8s rolling) консьюмер сохраняет назначение и не триггерит ребаланс (в пределах session.timeout.ms).
Liveness: консьюмер должен слать heartbeat (фон, heartbeat.interval.ms/session.timeout.ms) И регулярно вызывать poll (max.poll.interval.ms). Долгая обработка батча между poll’ами → консьюмер считается мёртвым → ребаланс → дубликаты.
Ретеншн и compaction#
- Delete retention (
cleanup.policy=delete): удаляет сегменты старшеretention.msили когда суммарный размер >retention.bytes. Это про “лог как буфер на N дней”. - Log compaction (
cleanup.policy=compact): хранит как минимум последнее значение для каждого ключа. Tombstone (значениеnull) помечает ключ на удаление. Это превращает топик в “снимок последнего состояния по ключу” — основа changelog’ов (Kafka Streams state stores,__consumer_offsets, CDC снапшоты). - Можно
compact,deleteодновременно.
Ключевые настройки (шпаргалка)#
| Параметр | Сторона | Назначение |
|---|---|---|
acks=all | producer | подтверждение от всех ISR |
enable.idempotence=true | producer | дедуп ретраев, порядок |
min.insync.replicas=2 | topic/broker | минимум ISR для записи при acks=all |
replication.factor=3 | topic | число реплик партиции |
retention.ms / retention.bytes | topic | политика хранения |
cleanup.policy | topic | delete / compact |
max.poll.records | consumer | размер батча на один poll |
max.poll.interval.ms | consumer | дедлайн между poll до считания мёртвым |
enable.auto.commit | consumer | авто-коммит оффсетов (часто off для контроля) |
isolation.level | consumer | read_committed для EOS |
partition.assignment.strategy | consumer | cooperative-sticky |
enable.auto.commit=true коммитит периодически в фоне (auto.commit.interval.ms) — удобно, но даёт at-least-once с риском потери при auto-commit перед обработкой (точнее — коммитится оффсет уже выданных, но, возможно, не обработанных записей). Для контроля семантики на проде обычно false + ручной commit после обработки.
Go-клиенты: franz-go vs sarama#
- sarama (IBM/Shopify): исторически дефолт. Зрелый, много кода вокруг. Минусы: тяжёлый API, исторические баги в consumer group/ребалансе, медленнее догоняет новые KIP, ручная работа с EOS неудобна.
- franz-go (twmb): современный, один из самых полных по покрытию протокола и KIP (cooperative ребаланс, EOS/transactions, KIP-345 static membership, сжатие). Чистый pull-API, хорошая производительность, активная поддержка. Предпочтителен для нового кода на senior-уровне.
- (confluent-kafka-go — cgo-обёртка над librdkafka; максимально совместима, но cgo усложняет сборку/кросс-компиляцию.)
Пример консьюмера на franz-go (at-least-once, ручной коммит, cooperative):
package main
import (
"context"
"log"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
cl, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ConsumerGroup("billing"),
kgo.ConsumeTopics("orders"),
// cooperative incremental ребаланс вместо stop-the-world
kgo.Balancers(kgo.CooperativeStickyBalancer()),
// ручной коммит: не коммитим до успешной обработки
kgo.DisableAutoCommit(),
)
if err != nil {
log.Fatal(err)
}
defer cl.Close()
ctx := context.Background()
for {
fetches := cl.PollFetches(ctx) // pull-модель: тянем батч
if errs := fetches.Errors(); len(errs) > 0 {
log.Printf("fetch errors: %v", errs)
continue
}
fetches.EachRecord(func(r *kgo.Record) {
// порядок гарантирован внутри партиции (r.Partition)
// обработка должна быть идемпотентной (at-least-once!)
if err := handle(r.Key, r.Value); err != nil {
log.Printf("handle %s/%d@%d: %v", r.Topic, r.Partition, r.Offset, err)
return
}
})
// коммитим только после успешной обработки батча
if err := cl.CommitUncommittedOffsets(ctx); err != nil {
log.Printf("commit: %v", err)
}
}
}
func handle(key, val []byte) error { return nil }Producer с idempotence + acks=all (franz-go включает идемпотентность по умолчанию):
cl, _ := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.RequiredAcks(kgo.AllISRAcks()), // acks=all
// идемпотентность включена по умолчанию; для строгого EOS:
kgo.TransactionalID("billing-tx-1"),
)Подводные камни / gotchas#
acks=allбезmin.insync.replicas>=2не спасает от потери — классическая ошибка. Нужно настраивать на топике/брокере.- Изменение числа партиций ломает key→partition для существующих ключей: порядок и locality нарушаются. Планируйте партиции заранее.
- Долгая обработка между poll’ами превышает
max.poll.interval.ms→ консьюмер выкидывается из группы → ребаланс → повторная обработка батча (дубликаты). Лечится уменьшениемmax.poll.records, выносом обработки или паузой партиций. - “Exactly-once” в Kafka — это Kafka→Kafka. При записи во внешние системы нужна идемпотентность/транзакция на приёмнике (outbox, dedup-таблица). EOS не магия для side-effects.
- Auto-commit ≠ at-least-once автоматически. Может коммитить оффсет до завершения обработки → at-most-once-подобная потеря при падении. Для строгого контроля — ручной коммит.
- Hot partition / skew: плохой ключ (например
country=USдля 80% трафика) делает одну партицию узким местом; параллелизм группы простаивает. - Rebalance storms: частые join/leave (нестабильные поды, маленький
session.timeout.ms) → постоянные ребалансы → деградация. Static membership + cooperative помогают. - Consumer lag — главная метрика здоровья:
log-end-offset - committed-offset. Растёт → консьюмеры не успевают. - Zero-copy ломается, если включить TLS или сделать преобразование формата на брокере — throughput падает.
- Retention vs compaction путают: для топиков-состояний (changelog) нужен
compact, для топиков-событий —deleteпо времени.
Вопросы на собеседовании#
В: Почему Kafka называют логом, а не очередью, и что это меняет? О: Сообщения не удаляются при чтении; брокер хранит append-only лог, а позицию (offset) держит консьюмер (pull). Это даёт: повторное чтение/replay, fan-out нескольким независимым группам, отвязку throughput’а консьюмера от producer’а, дешёвый ретеншн целыми сегментами. Классическая очередь удаляет сообщение после ack и не даёт replay.
В: Как Kafka гарантирует порядок? Можно ли получить глобальный порядок? О: Порядок строгий только внутри партиции (по offset). Глобального порядка между партициями нет; единственный способ — одна партиция, что убивает параллелизм. Практически: выбираем ключ (например order_id), чтобы связанные события шли в одну партицию — порядок per-key при сохранении общего параллелизма.
В: Чем определяется максимальный параллелизм consumer group? О: Числом партиций топика: партиция в момент времени обрабатывается ровно одним консьюмером группы. Консьюмеров больше, чем партиций — лишние простаивают. Масштабирование = увеличение партиций (с оговоркой про key→partition).
В: Как настроить надёжную доставку без потерь?
О: На producer — acks=all, enable.idempotence=true. На топике/брокере — replication.factor=3, min.insync.replicas=2. Тогда запись подтверждается минимум двумя ISR и переживает падение одной реплики; при недоступности кворума ISR producer получает ошибку вместо тихой потери. Консьюмер — at-least-once с идемпотентной обработкой.
В: Что такое exactly-once и где его границы?
О: EOS = idempotent producer (PID+seq, дедуп ретраев) + transactions (атомарная запись в партиции + коммит оффсетов через sendOffsetsToTransaction) + read_committed на консьюмере. Работает в пределах Kafka (consume-process-produce, Kafka Streams). Для side-effects во внешние системы exactly-once требует идемпотентности/транзакции на приёмнике (outbox).
В: Eager vs cooperative ребаланс? О: Eager — stop-the-world: все отдают все партиции и переназначаются, есть полный простой. Cooperative/incremental (KIP-429) — переезжают только реально перераспределяемые партиции в два прохода, остальные продолжают обрабатываться, downtime минимален. Cooperative — текущая рекомендация.
В: Что такое ISR и high-watermark?
О: ISR — реплики, синхронные с лидером (отставание < replica.lag.time.max.ms). High-watermark — максимальный offset, реплицированный во все ISR; консьюмеры не видят дальше HW. acks=all ждёт подтверждения всех ISR; min.insync.replicas задаёт минимальный размер ISR для приёма записи.
В: Delete retention vs log compaction — когда что? О: Delete — хранить лог события N времени/байт, потом удалять сегменты (топики-события). Compaction — хранить последнее значение на ключ (tombstone удаляет ключ); это снимок состояния, нужен для changelog/CDC/state stores. Можно совмещать compact,delete.
В: Какой Go-клиент выбрать и почему? О: franz-go — современный, нативный (без cgo), полное покрытие протокола и свежих KIP (cooperative ребаланс, EOS, static membership), хорошая производительность; рекомендую для нового кода. sarama — зрелый, но тяжёлый и с историей багов в consumer-group логике. confluent-kafka-go — cgo-обёртка над librdkafka, максимально совместима, но усложняет сборку.
На что копают на senior+#
- Понимание разницы между гарантиями producer’а и консьюмера, и того, что end-to-end EOS требует кооперации обеих сторон + приёмника (outbox, дедуп).
- Точная настройка durability: связка
acks=all+min.insync.replicas+replication.factor, поведение при схлопывании ISR (отказ записи vs потеря). - Влияние in-flight requests и идемпотентности на порядок при ретраях.
- Управление ребалансами на проде: cooperative, static membership, тюнинг
session.timeout.ms/max.poll.interval.ms, борьба с rebalance storms и дубликатами при долгой обработке. - Партиционирование как архитектурное решение: выбор ключа, борьба со skew/hot partition, последствия изменения числа партиций.
- Operational metrics: consumer lag, под-репликация (under-replicated partitions), HW vs LEO.
- Понимание physical layer: sequential I/O, page cache, zero-copy и что их ломает (TLS, конверсия формата).
- Outbox/transactional outbox как мост между Kafka EOS и внешними БД.