Модуль: Распределённые системы · Уровень: Senior+

TL;DR#

  • Kafka — это распределённый, реплицируемый, persistent commit log, а не классическая очередь. Сообщения не удаляются после чтения; консьюмеры читают по своему оффсету (pull-модель).
  • Топик → партиции. Партиция — единица параллелизма и единственная единица упорядочивания. Глобального порядка между партициями нет.
  • Consumer group: одна партиция в конкретный момент обрабатывается ровно одним консьюмером группы. Параллелизм группы ограничен числом партиций.
  • Гарантии порядка — только внутри партиции; ключ сообщения определяет партицию (hash(key) % partitions), поэтому одинаковый ключ → одна партиция → порядок.
  • Delivery: по умолчанию at-least-once. Exactly-once (EOS) достигается idempotent producer + transactions + read_committed. At-most-once — если коммитить оффсет до обработки.
  • Надёжность: acks=all + min.insync.replicas>=2 + replication.factor>=3. ISR — синхронные реплики.
  • Ребалансировка: eager (stop-the-world) vs cooperative/incremental (KIP-429). Ретеншн: по времени/размеру или log compaction (хранит последнее значение на ключ).
  • Go-клиенты: franz-go (предпочтительно, нативный, поддержка свежих KIP, EOS) vs sarama (зрелый, но тяжелее и с историей багов).

Теория#

Лог как структура данных#

Партиция — это append-only лог сегментов на диске. Каждое сообщение получает монотонный offset (int64) — порядковый номер в партиции. Это центральная идея Kafka: брокер не отслеживает, кто что прочитал, он лишь хранит лог и текущие границы.

Партиция P0 (топик "orders"):
 offset:   0    1    2    3    4    5    6   ...  (log-end-offset)
         [m0] [m1] [m2] [m3] [m4] [m5] [m6]
                          ^             ^
                    consumer A      producer пишет
                    committed=3     в конец (append)

Сегменты: лог нарезается на segment-файлы (*.log + индексы *.index по offset, *.timeindex по времени). Ретеншн и compaction работают на уровне сегментов — удалять/чистить можно целыми файлами, что дёшево. Активный сегмент не трогается до ротации.

Запись последовательная (sequential I/O) + zero-copy (sendfile) при отдаче — отсюда высокий throughput: данные идут page cache → socket, минуя userspace.

Топики, партиции, оффсеты#

  • Топик — логическое имя потока. Делится на N партиций (число задаётся при создании, увеличивать можно, уменьшать — нет).
  • Партиция — упорядоченная неизменяемая последовательность. Единица: репликации, параллелизма, упорядочивания.
  • Offset — позиция консьюмера. Хранится не у брокера-per-message, а коммитится консьюмером в служебный топик __consumer_offsets (или вручную во внешнее хранилище).

Ключевые оффсеты партиции: log-start-offset (после ретеншна), high-watermark (HW — максимальный реплицированный во все ISR оффсет; консьюмер в read_committed/обычном режиме не видит дальше HW), log-end-offset (LEO — конец лога лидера).

Распределение по партициям и роль ключа#

Producer выбирает партицию:

  • Есть ключ → partition = murmur2(key) % numPartitions (детерминированно). Гарантия: все сообщения с одним ключом попадают в одну партицию → сохраняют относительный порядок.
  • Нет ключа → sticky partitioner (батчит в одну партицию пока батч не закроется), затем меняет — для равномерности и эффективности батчинга.

Senior-нюанс: число партиций менять опасно для семантики ключа — % numPartitions поменяется, и существующие ключи “переедут” в другие партиции, ломая порядок и locality. Поэтому партиционирование планируют заранее с запасом.

Consumer groups#

Группа определяется group.id. Брокер-координатор раздаёт партиции топика членам группы так, что каждая партиция назначена ровно одному консьюмеру в группе.

Топик orders: 4 партиции.  Группа "billing" = 3 консьюмера.

P0 -> C1
P1 -> C1
P2 -> C2
P3 -> C3
                C1 держит 2 партиции (партиций больше, чем консьюмеров)

Если консьюмеров 5 при 4 партициях -> один консьюмер простаивает.

Следствия:

  • Максимальный параллелизм группы = число партиций. Хотите больше воркеров — добавляйте партиции.
  • Разные группы читают независимо, каждая со своим набором оффсетов (fan-out): один топик читают и billing, и analytics.

Гарантии упорядочивания#

  • Внутри партиции — строгий порядок (по offset).
  • Между партициями — порядка НЕТ. “Глобального порядка” в Kafka не существует без single-partition (что убивает параллелизм).
  • Практика: выбирайте ключ так, чтобы события, требующие порядка, попадали в одну партицию (например order_id). Тогда порядок per-order сохранён, а параллелизм есть между разными order_id.

Гарантия порядка от producer’а ломается при ретраях, если max.in.flight.requests.per.connection > 1 без идемпотентности: ретрай батча может переупорядочить запись. С idempotent producer (enable.idempotence=true) порядок при ретраях сохраняется до in-flight=5.

Delivery semantics#

  • At-most-once: коммит оффсета ДО обработки. Падение после коммита → потеря сообщения. Редко нужно.
  • At-least-once (по умолчанию): обработать → потом коммитить оффсет. Падение после обработки до коммита → повторная доставка. Требует идемпотентных консьюмеров (дедуп по бизнес-ключу, upsert).
  • Exactly-once (EOS): достигается комбинацией:
    • Idempotent producer (enable.idempotence=true): PID + sequence number на партицию, брокер отбрасывает дубликаты ретраев. Защищает producer→broker.
    • Transactions (transactional.id): атомарно пишем в несколько партиций/топиков И коммитим consumer-оффсеты в одной транзакции (паттерн consume-process-produce). Коммит оффсета через sendOffsetsToTransaction.
    • Консьюмеры читают с isolation.level=read_committed — видят только закоммиченные транзакции (фильтруют aborted).
    • EOS работает строго в рамках Kafka (Kafka→Kafka, например Kafka Streams). При записи во внешнюю БД нужна транзакция/идемпотентность на стороне приёмника (outbox-паттерн).

acks, ISR, репликация#

Каждая партиция: один leader + N-1 followers. Producer и consumer работают только с лидером. Followers пуллят данные с лидера.

ISR (In-Sync Replicas) — набор реплик, не отстающих от лидера больше replica.lag.time.max.ms. HW продвигается только когда запись реплицирована во все ISR.

acks:

  • acks=0 — fire-and-forget, возможна потеря.
  • acks=1 — лидер записал (в свой лог/page cache). Потеря, если лидер упал до репликации.
  • acks=all(=-1) — все ISR подтвердили. Надёжно ТОЛЬКО в паре с min.insync.replicas.

Ключевая ловушка: acks=all + replication.factor=3 + min.insync.replicas=1 — если ISR схлопнулся до лидера, “all” = 1 реплика, потеря при падении лидера остаётся. Правильно: replication.factor=3, min.insync.replicas=2, acks=all — переживает падение одной реплики и не теряет данные. Если ISR < min.insync — producer получает NotEnoughReplicas (отказ записи, а не тихая потеря).

Ребалансировка#

Когда состав группы или партиций меняется (consumer присоединился/упал/max.poll.interval.ms истёк), координатор запускает ребаланс.

  • Eager (старый, RangeAssignor/RoundRobin): stop-the-world — ВСЕ консьюмеры отдают ВСЕ партиции, потом получают новое назначение. Простоя на время ребаланса = плохо при больших группах.
  • Cooperative / incremental (KIP-429, CooperativeStickyAssignor): за два рывка отдаются только те партиции, что реально переезжают; остальные продолжают обрабатываться. Резко снижает downtime. Сейчас рекомендуемый.

Дополнительно static membership (KIP-345, group.instance.id) — при кратковременном рестарте пода (k8s rolling) консьюмер сохраняет назначение и не триггерит ребаланс (в пределах session.timeout.ms).

Liveness: консьюмер должен слать heartbeat (фон, heartbeat.interval.ms/session.timeout.ms) И регулярно вызывать poll (max.poll.interval.ms). Долгая обработка батча между poll’ами → консьюмер считается мёртвым → ребаланс → дубликаты.

Ретеншн и compaction#

  • Delete retention (cleanup.policy=delete): удаляет сегменты старше retention.ms или когда суммарный размер > retention.bytes. Это про “лог как буфер на N дней”.
  • Log compaction (cleanup.policy=compact): хранит как минимум последнее значение для каждого ключа. Tombstone (значение null) помечает ключ на удаление. Это превращает топик в “снимок последнего состояния по ключу” — основа changelog’ов (Kafka Streams state stores, __consumer_offsets, CDC снапшоты).
  • Можно compact,delete одновременно.

Ключевые настройки (шпаргалка)#

ПараметрСторонаНазначение
acks=allproducerподтверждение от всех ISR
enable.idempotence=trueproducerдедуп ретраев, порядок
min.insync.replicas=2topic/brokerминимум ISR для записи при acks=all
replication.factor=3topicчисло реплик партиции
retention.ms / retention.bytestopicполитика хранения
cleanup.policytopicdelete / compact
max.poll.recordsconsumerразмер батча на один poll
max.poll.interval.msconsumerдедлайн между poll до считания мёртвым
enable.auto.commitconsumerавто-коммит оффсетов (часто off для контроля)
isolation.levelconsumerread_committed для EOS
partition.assignment.strategyconsumercooperative-sticky

enable.auto.commit=true коммитит периодически в фоне (auto.commit.interval.ms) — удобно, но даёт at-least-once с риском потери при auto-commit перед обработкой (точнее — коммитится оффсет уже выданных, но, возможно, не обработанных записей). Для контроля семантики на проде обычно false + ручной commit после обработки.

Go-клиенты: franz-go vs sarama#

  • sarama (IBM/Shopify): исторически дефолт. Зрелый, много кода вокруг. Минусы: тяжёлый API, исторические баги в consumer group/ребалансе, медленнее догоняет новые KIP, ручная работа с EOS неудобна.
  • franz-go (twmb): современный, один из самых полных по покрытию протокола и KIP (cooperative ребаланс, EOS/transactions, KIP-345 static membership, сжатие). Чистый pull-API, хорошая производительность, активная поддержка. Предпочтителен для нового кода на senior-уровне.
  • (confluent-kafka-go — cgo-обёртка над librdkafka; максимально совместима, но cgo усложняет сборку/кросс-компиляцию.)

Пример консьюмера на franz-go (at-least-once, ручной коммит, cooperative):

package main

import (
	"context"
	"log"

	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	cl, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		kgo.ConsumerGroup("billing"),
		kgo.ConsumeTopics("orders"),
		// cooperative incremental ребаланс вместо stop-the-world
		kgo.Balancers(kgo.CooperativeStickyBalancer()),
		// ручной коммит: не коммитим до успешной обработки
		kgo.DisableAutoCommit(),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer cl.Close()

	ctx := context.Background()
	for {
		fetches := cl.PollFetches(ctx) // pull-модель: тянем батч
		if errs := fetches.Errors(); len(errs) > 0 {
			log.Printf("fetch errors: %v", errs)
			continue
		}

		fetches.EachRecord(func(r *kgo.Record) {
			// порядок гарантирован внутри партиции (r.Partition)
			// обработка должна быть идемпотентной (at-least-once!)
			if err := handle(r.Key, r.Value); err != nil {
				log.Printf("handle %s/%d@%d: %v", r.Topic, r.Partition, r.Offset, err)
				return
			}
		})

		// коммитим только после успешной обработки батча
		if err := cl.CommitUncommittedOffsets(ctx); err != nil {
			log.Printf("commit: %v", err)
		}
	}
}

func handle(key, val []byte) error { return nil }

Producer с idempotence + acks=all (franz-go включает идемпотентность по умолчанию):

cl, _ := kgo.NewClient(
	kgo.SeedBrokers("localhost:9092"),
	kgo.RequiredAcks(kgo.AllISRAcks()), // acks=all
	// идемпотентность включена по умолчанию; для строгого EOS:
	kgo.TransactionalID("billing-tx-1"),
)

Подводные камни / gotchas#

  • acks=all без min.insync.replicas>=2 не спасает от потери — классическая ошибка. Нужно настраивать на топике/брокере.
  • Изменение числа партиций ломает key→partition для существующих ключей: порядок и locality нарушаются. Планируйте партиции заранее.
  • Долгая обработка между poll’ами превышает max.poll.interval.ms → консьюмер выкидывается из группы → ребаланс → повторная обработка батча (дубликаты). Лечится уменьшением max.poll.records, выносом обработки или паузой партиций.
  • “Exactly-once” в Kafka — это Kafka→Kafka. При записи во внешние системы нужна идемпотентность/транзакция на приёмнике (outbox, dedup-таблица). EOS не магия для side-effects.
  • Auto-commit ≠ at-least-once автоматически. Может коммитить оффсет до завершения обработки → at-most-once-подобная потеря при падении. Для строгого контроля — ручной коммит.
  • Hot partition / skew: плохой ключ (например country=US для 80% трафика) делает одну партицию узким местом; параллелизм группы простаивает.
  • Rebalance storms: частые join/leave (нестабильные поды, маленький session.timeout.ms) → постоянные ребалансы → деградация. Static membership + cooperative помогают.
  • Consumer lag — главная метрика здоровья: log-end-offset - committed-offset. Растёт → консьюмеры не успевают.
  • Zero-copy ломается, если включить TLS или сделать преобразование формата на брокере — throughput падает.
  • Retention vs compaction путают: для топиков-состояний (changelog) нужен compact, для топиков-событий — delete по времени.

Вопросы на собеседовании#

В: Почему Kafka называют логом, а не очередью, и что это меняет? О: Сообщения не удаляются при чтении; брокер хранит append-only лог, а позицию (offset) держит консьюмер (pull). Это даёт: повторное чтение/replay, fan-out нескольким независимым группам, отвязку throughput’а консьюмера от producer’а, дешёвый ретеншн целыми сегментами. Классическая очередь удаляет сообщение после ack и не даёт replay.

В: Как Kafka гарантирует порядок? Можно ли получить глобальный порядок? О: Порядок строгий только внутри партиции (по offset). Глобального порядка между партициями нет; единственный способ — одна партиция, что убивает параллелизм. Практически: выбираем ключ (например order_id), чтобы связанные события шли в одну партицию — порядок per-key при сохранении общего параллелизма.

В: Чем определяется максимальный параллелизм consumer group? О: Числом партиций топика: партиция в момент времени обрабатывается ровно одним консьюмером группы. Консьюмеров больше, чем партиций — лишние простаивают. Масштабирование = увеличение партиций (с оговоркой про key→partition).

В: Как настроить надёжную доставку без потерь? О: На producer — acks=all, enable.idempotence=true. На топике/брокере — replication.factor=3, min.insync.replicas=2. Тогда запись подтверждается минимум двумя ISR и переживает падение одной реплики; при недоступности кворума ISR producer получает ошибку вместо тихой потери. Консьюмер — at-least-once с идемпотентной обработкой.

В: Что такое exactly-once и где его границы? О: EOS = idempotent producer (PID+seq, дедуп ретраев) + transactions (атомарная запись в партиции + коммит оффсетов через sendOffsetsToTransaction) + read_committed на консьюмере. Работает в пределах Kafka (consume-process-produce, Kafka Streams). Для side-effects во внешние системы exactly-once требует идемпотентности/транзакции на приёмнике (outbox).

В: Eager vs cooperative ребаланс? О: Eager — stop-the-world: все отдают все партиции и переназначаются, есть полный простой. Cooperative/incremental (KIP-429) — переезжают только реально перераспределяемые партиции в два прохода, остальные продолжают обрабатываться, downtime минимален. Cooperative — текущая рекомендация.

В: Что такое ISR и high-watermark? О: ISR — реплики, синхронные с лидером (отставание < replica.lag.time.max.ms). High-watermark — максимальный offset, реплицированный во все ISR; консьюмеры не видят дальше HW. acks=all ждёт подтверждения всех ISR; min.insync.replicas задаёт минимальный размер ISR для приёма записи.

В: Delete retention vs log compaction — когда что? О: Delete — хранить лог события N времени/байт, потом удалять сегменты (топики-события). Compaction — хранить последнее значение на ключ (tombstone удаляет ключ); это снимок состояния, нужен для changelog/CDC/state stores. Можно совмещать compact,delete.

В: Какой Go-клиент выбрать и почему? О: franz-go — современный, нативный (без cgo), полное покрытие протокола и свежих KIP (cooperative ребаланс, EOS, static membership), хорошая производительность; рекомендую для нового кода. sarama — зрелый, но тяжёлый и с историей багов в consumer-group логике. confluent-kafka-go — cgo-обёртка над librdkafka, максимально совместима, но усложняет сборку.

На что копают на senior+#

  • Понимание разницы между гарантиями producer’а и консьюмера, и того, что end-to-end EOS требует кооперации обеих сторон + приёмника (outbox, дедуп).
  • Точная настройка durability: связка acks=all + min.insync.replicas + replication.factor, поведение при схлопывании ISR (отказ записи vs потеря).
  • Влияние in-flight requests и идемпотентности на порядок при ретраях.
  • Управление ребалансами на проде: cooperative, static membership, тюнинг session.timeout.ms/max.poll.interval.ms, борьба с rebalance storms и дубликатами при долгой обработке.
  • Партиционирование как архитектурное решение: выбор ключа, борьба со skew/hot partition, последствия изменения числа партиций.
  • Operational metrics: consumer lag, под-репликация (under-replicated partitions), HW vs LEO.
  • Понимание physical layer: sequential I/O, page cache, zero-copy и что их ломает (TLS, конверсия формата).
  • Outbox/transactional outbox как мост между Kafka EOS и внешними БД.