Модуль: Concurrency · Уровень: Senior

TL;DR#

Базовый словарь конкурентного Go: pipeline (этапы через каналы), fan-out/fan-in (распараллелить этап и собрать), worker pool (фиксированный пул на канале задач), семафор (ограничение конкуренции буферизованным каналом), errgroup (группа горутин с отменой и первой ошибкой), graceful shutdown (через context + WaitGroup), or-done (обёртка для отмены). Общий принцип всех паттернов: у каждой горутины есть явный путь завершения (закрытие входа или ctx.Done()), иначе — утечка.

Теория#

Pipeline#

Этапы, соединённые каналами; каждый этап читает вход, пишет выход, закрывает свой выход при завершении.

func gen(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

func sq(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

Правило: закрывает канал тот, кто в него пишет (producer), и только он. Закрытие распространяется по цепочке через range.

Fan-out / Fan-in#

Fan-out — несколько горутин читают из одного канала (распараллелили этап). Fan-in — слить несколько каналов в один.

func fanIn(ctx context.Context, chans ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup
    for _, c := range chans {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                select {
                case out <- v:
                case <-ctx.Done():
                    return
                }
            }
        }(c)
    }
    go func() { wg.Wait(); close(out) }() // закрыть out, когда все источники иссякли
    return out
}

Worker pool#

Фиксированное число воркеров на общем канале задач — ограничивает параллелизм и переиспользует горутины.

func pool(ctx context.Context, jobs <-chan Job, n int) <-chan Result {
    results := make(chan Result)
    var wg sync.WaitGroup
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := range jobs {
                select {
                case results <- process(j):
                case <-ctx.Done():
                    return
                }
            }
        }()
    }
    go func() { wg.Wait(); close(results) }()
    return results
}

Размер пула: для CPU-bound ≈ GOMAXPROCS; для I/O-bound — больше (зависит от внешних лимитов).

Semaphore (ограничение конкуренции)#

Буферизованный канал ёмкости N как счётный семафор:

sem := make(chan struct{}, maxConcurrent)
for _, task := range tasks {
    sem <- struct{}{}           // занять слот (блокирует при N занятых)
    go func(t Task) {
        defer func() { <-sem }() // освободить
        handle(t)
    }(task)
}

Альтернатива с весами и context-отменой — golang.org/x/sync/semaphore (Acquire(ctx, n)).

errgroup#

golang.org/x/sync/errgroup — WaitGroup + первая ошибка + отмена контекста.

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(8) // ограничение параллелизма (опционально)
for _, url := range urls {
    url := url
    g.Go(func() error {
        return fetch(ctx, url) // вернёт ошибку → ctx отменится для остальных
    })
}
if err := g.Wait(); err != nil { // первая ненулевая ошибка
    return err
}

WithContext отменяет ctx, как только любая горутина вернула ошибку → остальные, проверяющие ctx.Done(), быстро завершаются. Wait возвращает первую ошибку. SetLimit встроенно ограничивает конкуренцию.

Graceful shutdown горутин#

Контекст для отмены + WaitGroup для ожидания завершения:

func run(ctx context.Context) error {
    ctx, stop := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM)
    defer stop()

    var wg sync.WaitGroup
    wg.Add(1)
    go worker(ctx, &wg)

    <-ctx.Done()        // дождались сигнала/отмены
    // здесь можно дать дедлайн на graceful: shutdownCtx с таймаутом
    wg.Wait()           // дождались, что воркеры дочистили
    return nil
}

Паттерн: сигнал → отмена ctx → воркеры видят ctx.Done() и завершаются → wg.Wait() гарантирует, что они дочистили ресурсы → при необходимости таймаут на shutdown, чтобы не висеть вечно.

Or-done channel#

Обёртка, которая прекращает чтение из канала при отмене — убирает дублирование select в каждом потребителе.

func orDone[T any](ctx context.Context, in <-chan T) <-chan T {
    out := make(chan T)
    go func() {
        defer close(out)
        for {
            select {
            case v, ok := <-in:
                if !ok { return }
                select {
                case out <- v:
                case <-ctx.Done():
                    return
                }
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// потребитель чистый:
for v := range orDone(ctx, source) { use(v) }

tee, bridge#

  • tee — раздваивает один канал на два (каждое значение уходит в оба).
  • bridge — «склеивает» канал каналов (<-chan <-chan T) в один поток.

Подводные камни / gotchas#

  • Закрытие канала не тем, кто пишет. Закрывать должен единственный producer. Закрытие из получателя или из нескольких писателей → паника «send on closed channel»/«close of closed channel».
  • Нет ветки отмены в долгоживущей горутине → утечка при раннем выходе потребителя. Каждая блокирующая операция в фоне должна иметь ctx.Done().
  • wg.Wait(); close(out) пропущен в fan-in → out никогда не закроется, range зависнет.
  • Неограниченный fan-out (go на каждый элемент без семафора/пула) → взрыв числа горутин, OOM, перегрузка downstream. Ограничивайте пулом/семафором/SetLimit.
  • errgroup без проверки ctx внутри функций — отмена не сработает: горутины должны сами реагировать на ctx.Done(). errgroup лишь отменяет контекст.
  • Buffered-канал не «починит» отсутствие потребителя — он лишь отсрочит блокировку/утечку.
  • Graceful без таймаута — если воркер завис, wg.Wait() будет ждать вечно. Дайте дедлайн на shutdown.
  • Захват переменной цикла до Go 1.22 требовал копии (url := url); в 1.22+ loopvar per-iteration, но в errgroup.Go/горутинах всё равно держите это в голове на старых версиях.

Вопросы на собеседовании#

В: Кто должен закрывать канал и почему? О: Только отправитель (producer), и только один. Получатель закрывать не должен — это приведёт к панике при следующей отправке. При нескольких писателях используют отдельную координацию (WaitGroup + закрытие после Wait единственной горутиной).

В: Чем fan-out отличается от worker pool? О: Fan-out — общая идея «много горутин читают один канал». Worker pool — конкретная реализация с фиксированным числом переиспользуемых воркеров, что ограничивает параллелизм и не плодит горутины на каждый элемент.

В: Как ограничить число одновременных операций? О: Буферизованный канал-семафор ёмкости N (занять перед, освободить после), worker pool из N воркеров, errgroup.SetLimit(N) или x/sync/semaphore с весами и context-отменой.

В: Что даёт errgroup поверх WaitGroup? О: Сбор первой ошибки, отмену общего контекста при первой ошибке (WithContext), опциональный лимит параллелизма (SetLimit) и удобный Go(func() error). WaitGroup только считает завершения.

В: Как реализовать graceful shutdown горутин? О: Передать всем context, по сигналу (signal.NotifyContext) отменить его; горутины реагируют на ctx.Done() и завершаются, освобождая ресурсы; wg.Wait() дожидается завершения. Обязательно с таймаутом на shutdown, иначе зависшая горутина повесит процесс.

В: Зачем нужен or-done паттерн? О: Чтобы не дублировать select { case v := <-in: ...; case <-ctx.Done(): } в каждом потребителе. or-done оборачивает входной канал и сам прекращает поток при отмене, давая потребителю чистый for range.

В: Как правильно закрыть выходной канал fan-in? О: Каждая копирующая горутина в WaitGroup; отдельная горутина делает wg.Wait(); close(out). Так out закроется ровно когда все источники иссякли, и range out корректно завершится.

В: Какой размер пула выбрать? О: Для CPU-bound ≈ runtime.GOMAXPROCS (число логических ядер). Для I/O-bound — больше, ограничено внешними лимитами (пул соединений БД, rate limit API). Замеряйте: слишком много воркеров создаёт contention и перегружает downstream.

В: Почему неограниченный go на каждый элемент опасен? О: Число горутин неконтролируемо растёт при большом/бесконечном вводе → память, давление на планировщик, перегрузка downstream-сервисов. Нужен пул/семафор/лимит.

На что копают на senior+#

  • Дисциплина владения каналами: единственный writer закрывает; распространение close по pipeline; почему close — это broadcast.
  • Корректная отмена сквозь все этапы pipeline (ctx в каждом select) и доказательство отсутствия утечек.
  • errgroup внутри: как WithContext отменяет, как SetLimit реализован (семафор), порядок возврата ошибки.
  • Backpressure в pipeline: небуферизованные каналы как естественный контроль потока vs буферизация.
  • Graceful shutdown с дедлайном, дренаж очередей, идемпотентность, порядок остановки этапов.
  • Generic-обёртки (orDone/tee/bridge на дженериках 1.18+) и их влияние на читаемость/производительность.