Модуль: Concurrency · Уровень: Senior
TL;DR#
Базовый словарь конкурентного Go: pipeline (этапы через каналы), fan-out/fan-in (распараллелить этап и собрать), worker pool (фиксированный пул на канале задач), семафор (ограничение конкуренции буферизованным каналом), errgroup (группа горутин с отменой и первой ошибкой), graceful shutdown (через context + WaitGroup), or-done (обёртка для отмены). Общий принцип всех паттернов: у каждой горутины есть явный путь завершения (закрытие входа или ctx.Done()), иначе — утечка.
Теория#
Pipeline#
Этапы, соединённые каналами; каждый этап читает вход, пишет выход, закрывает свой выход при завершении.
func gen(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}()
return out
}
func sq(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}Правило: закрывает канал тот, кто в него пишет (producer), и только он. Закрытие распространяется по цепочке через range.
Fan-out / Fan-in#
Fan-out — несколько горутин читают из одного канала (распараллелили этап). Fan-in — слить несколько каналов в один.
func fanIn(ctx context.Context, chans ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
for _, c := range chans {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
select {
case out <- v:
case <-ctx.Done():
return
}
}
}(c)
}
go func() { wg.Wait(); close(out) }() // закрыть out, когда все источники иссякли
return out
}Worker pool#
Фиксированное число воркеров на общем канале задач — ограничивает параллелизм и переиспользует горутины.
func pool(ctx context.Context, jobs <-chan Job, n int) <-chan Result {
results := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := range jobs {
select {
case results <- process(j):
case <-ctx.Done():
return
}
}
}()
}
go func() { wg.Wait(); close(results) }()
return results
}Размер пула: для CPU-bound ≈ GOMAXPROCS; для I/O-bound — больше (зависит от внешних лимитов).
Semaphore (ограничение конкуренции)#
Буферизованный канал ёмкости N как счётный семафор:
sem := make(chan struct{}, maxConcurrent)
for _, task := range tasks {
sem <- struct{}{} // занять слот (блокирует при N занятых)
go func(t Task) {
defer func() { <-sem }() // освободить
handle(t)
}(task)
}Альтернатива с весами и context-отменой — golang.org/x/sync/semaphore (Acquire(ctx, n)).
errgroup#
golang.org/x/sync/errgroup — WaitGroup + первая ошибка + отмена контекста.
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(8) // ограничение параллелизма (опционально)
for _, url := range urls {
url := url
g.Go(func() error {
return fetch(ctx, url) // вернёт ошибку → ctx отменится для остальных
})
}
if err := g.Wait(); err != nil { // первая ненулевая ошибка
return err
}WithContext отменяет ctx, как только любая горутина вернула ошибку → остальные, проверяющие ctx.Done(), быстро завершаются. Wait возвращает первую ошибку. SetLimit встроенно ограничивает конкуренцию.
Graceful shutdown горутин#
Контекст для отмены + WaitGroup для ожидания завершения:
func run(ctx context.Context) error {
ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
defer stop()
var wg sync.WaitGroup
wg.Add(1)
go worker(ctx, &wg)
<-ctx.Done() // дождались сигнала/отмены
// здесь можно дать дедлайн на graceful: shutdownCtx с таймаутом
wg.Wait() // дождались, что воркеры дочистили
return nil
}Паттерн: сигнал → отмена ctx → воркеры видят ctx.Done() и завершаются → wg.Wait() гарантирует, что они дочистили ресурсы → при необходимости таймаут на shutdown, чтобы не висеть вечно.
Or-done channel#
Обёртка, которая прекращает чтение из канала при отмене — убирает дублирование select в каждом потребителе.
func orDone[T any](ctx context.Context, in <-chan T) <-chan T {
out := make(chan T)
go func() {
defer close(out)
for {
select {
case v, ok := <-in:
if !ok { return }
select {
case out <- v:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}
// потребитель чистый:
for v := range orDone(ctx, source) { use(v) }tee, bridge#
- tee — раздваивает один канал на два (каждое значение уходит в оба).
- bridge — «склеивает» канал каналов (
<-chan <-chan T) в один поток.
Подводные камни / gotchas#
- Закрытие канала не тем, кто пишет. Закрывать должен единственный producer. Закрытие из получателя или из нескольких писателей → паника «send on closed channel»/«close of closed channel».
- Нет ветки отмены в долгоживущей горутине → утечка при раннем выходе потребителя. Каждая блокирующая операция в фоне должна иметь
ctx.Done(). wg.Wait(); close(out)пропущен в fan-in → out никогда не закроется,rangeзависнет.- Неограниченный fan-out (
goна каждый элемент без семафора/пула) → взрыв числа горутин, OOM, перегрузка downstream. Ограничивайте пулом/семафором/SetLimit. - errgroup без проверки ctx внутри функций — отмена не сработает: горутины должны сами реагировать на
ctx.Done(). errgroup лишь отменяет контекст. - Buffered-канал не «починит» отсутствие потребителя — он лишь отсрочит блокировку/утечку.
- Graceful без таймаута — если воркер завис,
wg.Wait()будет ждать вечно. Дайте дедлайн на shutdown. - Захват переменной цикла до Go 1.22 требовал копии (
url := url); в 1.22+ loopvar per-iteration, но вerrgroup.Go/горутинах всё равно держите это в голове на старых версиях.
Вопросы на собеседовании#
В: Кто должен закрывать канал и почему? О: Только отправитель (producer), и только один. Получатель закрывать не должен — это приведёт к панике при следующей отправке. При нескольких писателях используют отдельную координацию (WaitGroup + закрытие после Wait единственной горутиной).
В: Чем fan-out отличается от worker pool? О: Fan-out — общая идея «много горутин читают один канал». Worker pool — конкретная реализация с фиксированным числом переиспользуемых воркеров, что ограничивает параллелизм и не плодит горутины на каждый элемент.
В: Как ограничить число одновременных операций?
О: Буферизованный канал-семафор ёмкости N (занять перед, освободить после), worker pool из N воркеров, errgroup.SetLimit(N) или x/sync/semaphore с весами и context-отменой.
В: Что даёт errgroup поверх WaitGroup?
О: Сбор первой ошибки, отмену общего контекста при первой ошибке (WithContext), опциональный лимит параллелизма (SetLimit) и удобный Go(func() error). WaitGroup только считает завершения.
В: Как реализовать graceful shutdown горутин?
О: Передать всем context, по сигналу (signal.NotifyContext) отменить его; горутины реагируют на ctx.Done() и завершаются, освобождая ресурсы; wg.Wait() дожидается завершения. Обязательно с таймаутом на shutdown, иначе зависшая горутина повесит процесс.
В: Зачем нужен or-done паттерн?
О: Чтобы не дублировать select { case v := <-in: ...; case <-ctx.Done(): } в каждом потребителе. or-done оборачивает входной канал и сам прекращает поток при отмене, давая потребителю чистый for range.
В: Как правильно закрыть выходной канал fan-in?
О: Каждая копирующая горутина в WaitGroup; отдельная горутина делает wg.Wait(); close(out). Так out закроется ровно когда все источники иссякли, и range out корректно завершится.
В: Какой размер пула выбрать?
О: Для CPU-bound ≈ runtime.GOMAXPROCS (число логических ядер). Для I/O-bound — больше, ограничено внешними лимитами (пул соединений БД, rate limit API). Замеряйте: слишком много воркеров создаёт contention и перегружает downstream.
В: Почему неограниченный go на каждый элемент опасен?
О: Число горутин неконтролируемо растёт при большом/бесконечном вводе → память, давление на планировщик, перегрузка downstream-сервисов. Нужен пул/семафор/лимит.
На что копают на senior+#
- Дисциплина владения каналами: единственный writer закрывает; распространение close по pipeline; почему close — это broadcast.
- Корректная отмена сквозь все этапы pipeline (ctx в каждом select) и доказательство отсутствия утечек.
- errgroup внутри: как
WithContextотменяет, какSetLimitреализован (семафор), порядок возврата ошибки. - Backpressure в pipeline: небуферизованные каналы как естественный контроль потока vs буферизация.
- Graceful shutdown с дедлайном, дренаж очередей, идемпотентность, порядок остановки этапов.
- Generic-обёртки (orDone/tee/bridge на дженериках 1.18+) и их влияние на читаемость/производительность.