🧩 Coordination: как синхронизировать хаос — Java Blocking vs Reactor vs Go

Конкурентность — это не про «запустить много потоков». Это про договорённости между ними.

Представь кухню ресторана: — повара (потоки / горутины) — заказы (задачи) — и главный вопрос: как они координируются?

Сегодня ты увидишь 3 мира:

  • 🟨 Blocking Java — «каждый ждёт»
  • 🟣 Reactor (Reactive Streams) — «всё течёт, никто не блокирует»
  • 🔵 Go — «простые горутины + каналы»

Wait All Tasks (Barrier Join)

Wait All Tasks — 🎬 «финальные титры» 🍿, ждём пока все закончат

💬 Объяснение

Ты запускаешь несколько задач — и хочешь продолжить только когда все завершились.

Это как команда строителей: нельзя сдавать дом, пока каждый не закончил свою часть.

  • Java: CountDownLatch — счётчик, уменьшается до нуля
  • Reactor: Mono.when — собирает несколько async потоков
  • Go: WaitGroup — добавил задачи → ждёшь

🗺️ Схема


Задачи:
 T1 ──┐
 T2 ──┼──▶ [ WAIT ALL ] ──▶ Continue
 T3 ──┘

🟨 Java:
 T1 → countDown()
 T2 → countDown()
 T3 → countDown()
 main → await()

🟣 Reactor:
 Mono1 ─┐
 Mono2 ─┼──▶ Mono.when() → done
 Mono3 ─┘

🔵 Go:
 wg.Add(3)
 go T1 → wg.Done()
 go T2 → wg.Done()
 go T3 → wg.Done()
 wg.Wait()

📊 Сводная таблица

🎯 Подход 🟨 Blocking Java 🟣 Reactor 🔵 Go
🧩 Пример код

// создаём latch на 3 задачи
CountDownLatch latch = new CountDownLatch(3);

Runnable task = () -> {
    // выполняем работу
    System.out.println("Task done");
    
    // уменьшаем счётчик
    latch.countDown();
};

new Thread(task).start();
new Thread(task).start();
new Thread(task).start();

// ждём пока все завершатся
latch.await();

System.out.println("All done!");

// три асинхронных потока
Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.just("C");

// ждём завершения всех
Mono.when(m1, m2, m3)
    .doOnTerminate(() -> System.out.println("All done"))
    .subscribe();

var wg sync.WaitGroup

wg.Add(3)

go func() {
    defer wg.Done()
    fmt.Println("Task 1 done")
}()

go func() {
    defer wg.Done()
    fmt.Println("Task 2 done")
}()

go func() {
    defer wg.Done()
    fmt.Println("Task 3 done")
}()

// блокируемся пока все не закончат
wg.Wait()

fmt.Println("All done")
🧠 Ментальная модель Счётчик задач. Поток стоит и ждёт. Композиция async-потоков. Никто не блокируется. Горутины + счётчик. Простая синхронизация.
✔ Плюсы Просто понять. Прямолинейно. Не блокирует поток. Масштабируемо. Минимум кода. Очень читаемо.
❌ Минусы Блокировка → плохо масштабируется. Сложнее мышление (reactive mindset). Можно забыть Done → deadlock.
💥 Модель Thread-per-task Event-loop + non-blocking Goroutine scheduler
🛠️ Под капотом Lock + park потоков Publisher/Subscriber + сигналы Лёгкие потоки + runtime scheduler
⚠️ Анти-паттерны Latch без countDown → зависание Блокировка внутри reactive Wait без Add или Done
🚀 Когда использовать Небольшие задачи High-load async системы Серверы, конкурентные задачи
🔥 Комментарий Самый «старый» подход Самый масштабируемый Самый интуитивный

Для 🟨 Java Blocking: используй CountDownLatch только для короткоживущих задач — под капотом поток реально «спит» и жрёт ресурсы.

Для 🟣 Reactor: не вставляй .block() — это ломает весь non-blocking pipeline и превращает его в blocking ад.

Для 🔵 Go: всегда ставь defer wg.Done() — иначе забудешь и получишь deadlock.

API gateway (🟣 Reactor): собираешь ответы от нескольких сервисов → Mono.when → быстрее и не блокирует.

Batch processing (🟨 Java): запустил 5 задач → дождался → записал результат — просто и понятно.

Параллельные воркеры (🔵 Go): обработка очереди — WaitGroup ждёт завершения всех воркеров.

Mutex (Critical Section)

Mutex — 🔒 «один ключ на дверь» 🚪, только один внутри

💬 Объяснение

Когда несколько потоков хотят изменить одни и те же данные — нужен контроль.

Иначе будет хаос: race condition.

Mutex = «внутрь заходит только один».

  • Java: synchronized
  • Reactor: избегает shared state
  • Go: sync.Mutex

🗺️ Схема


Потоки:
 T1 ─┐
 T2 ─┼──▶ [ LOCK ] → critical section → unlock
 T3 ─┘

🟨 Java:
 synchronized(obj) { ... }

🟣 Reactor:
 нет lock → immutable / sequence

🔵 Go:
 mu.Lock()
 ...
 mu.Unlock()

📊 Сводная таблица

🎯 Подход 🟨 Java 🟣 Reactor 🔵 Go
🧩 Пример код

private int counter = 0;

public synchronized void inc() {
    // только один поток здесь
    counter++;
}

// избегаем shared state
Flux.range(1, 10)
    .map(i -> i * 2) // immutable transform
    .subscribe();

var mu sync.Mutex
counter := 0

mu.Lock()
counter++
mu.Unlock()
🧠 Ментальная модель Lock объект Нет shared state Явный lock
✔ Плюсы Просто Нет race вообще Гибкость
❌ Минусы Deadlock Сложно мыслить Забыть unlock
💥 Модель Shared memory Message passing Shared + channels
🛠️ Под капотом Монитор Event stream Mutex + scheduler
⚠️ Анти-паттерны Nested locks Mutable state Double lock
🚀 Когда использовать Legacy code Streaming System-level code
🔥 Комментарий Нужно, но опасно Лучше избегать Норм, если аккуратно

Для 🟨 Java: избегай вложенных synchronized — под капотом это может привести к deadlock через блокировку мониторов.

Для 🟣 Reactor: не тащи mutable state — вся сила reactive в том, что данные не делятся между потоками.

Для 🔵 Go: если используешь mutex — подумай, можно ли заменить на channel (message passing).

Кэш (🟨 Java): синхронизация доступа к Map — иначе race condition.

Streaming pipeline (🟣 Reactor): лучше делать immutable transformations — никаких lock не нужно.

Low-level сервисы (🔵 Go): mutex для счётчиков, метрик, shared state.

First Result Wins

First Result Wins — 🏁 «кто первый — того и тапки» ⚡, берём самый быстрый

💬 Объяснение

Ты запускаешь несколько задач — и берёшь первый результат.

Остальные — игнорируешь или отменяешь.

Это как запрос в несколько CDN — берём самый быстрый ответ.

  • Java: CompletableFuture.anyOf
  • Reactor: Flux.first
  • Go: select

🗺️ Схема


 T1 ──┐
 T2 ──┼──▶ [ FIRST ] ──▶ result
 T3 ──┘

остальные отменяются / игнорируются

📊 Сводная таблица

🎯 Подход 🟨 Java 🟣 Reactor 🔵 Go
🧩 Пример код

CompletableFuture<String> f1 = slow();
CompletableFuture<String> f2 = fast();

// берём первый завершившийся
CompletableFuture.anyOf(f1, f2)
    .thenAccept(result -> {
        System.out.println(result);
    });

Flux.first(
    slowFlux(),
    fastFlux()
).subscribe(System.out::println);

select {
case r := <-fast:
    fmt.Println(r)
case r := <-slow:
    fmt.Println(r)
}
🧠 Ментальная модель Race futures Race streams Race channels
✔ Плюсы Быстро Очень эффективно Просто
❌ Минусы Остальные не отменяются Сложность debugging Нужно контролировать утечки
💥 Модель Future race Reactive race Select
🛠️ Под капотом CompletionStage Signals Scheduler + select
⚠️ Анти-паттерны Игнор cancel Hot streams Goroutine leak
🚀 Когда использовать Fallback API CDN, кеши Network race
🔥 Комментарий Хорошо, но осторожно Очень мощно Самый чистый вариант

Для 🟨 Java: обязательно продумывай отмену остальных задач — иначе они продолжают работать и жрут ресурсы.

Для 🟣 Reactor: используй timeout + fallback — это усиливает паттерн.

Для 🔵 Go: закрывай каналы или добавляй context — иначе утечки горутин.

Multi-CDN (все): отправляешь запросы → берёшь самый быстрый → ускорение latency.

Failover (🟨 Java): основной сервис + резервный — anyOf.

High-load сервисы (🟣 Reactor): race между cache и DB.

Network clients (🔵 Go): select между несколькими источниками данных.

Join Futures

Join Futures — 🧺 собрать корзину результатов, множество async задач → один итог

💬 Объяснение

Представь кухню ресторана.

Ты заказал:

  • бургер
  • картошку
  • напиток

Каждое блюдо готовится отдельно.

Но официант приносит один поднос.

Это и есть Join Futures.

Мы запускаем несколько async задач и потом собираем их результаты в один.

В разных мирах это делается по-разному:

  • 🟨 Java Blocking — список Future + get()
  • 🟣 Reactor — flatMap + collect
  • 🔵 Go — goroutines + channel

🗺️ Схема


JOIN FUTURES

Задача A ----\
              \
Задача B ----- JOIN ----> итоговый результат
              /
Задача C ----/


Java Blocking

Thread1 -> Task A
Thread2 -> Task B
Thread3 -> Task C

main thread:
futureA.get()
futureB.get()
futureC.get()


Java Reactive (Reactor)

Flux(tasks)
   -> flatMap(asyncCall)
   -> collectList()

не блокирует поток


Go

go taskA()
go taskB()
go taskC()

results <- channel

for i := 0; i < 3; i++ {
   result := <-results
}

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход Future + ожидание результата Reactive Streams goroutines + channel aggregation
🧩 Пример код

ExecutorService pool = Executors.newFixedThreadPool(3);

List<Future<Integer>> futures = new ArrayList<>();

for (int i = 0; i < 3; i++) {

    // отправляем задачу в thread pool
    futures.add(pool.submit(() -> {

        // имитация работы
        Thread.sleep(1000);

        return 10;
    }));
}

int sum = 0;

for (Future<Integer> future : futures) {

    // БЛОКИРУЕМСЯ пока результат не готов
    sum += future.get();
}

System.out.println(sum);

Flux.range(1,3)

    // запускаем async задачи
    .flatMap(i -> Mono.fromCallable(() -> {

        Thread.sleep(1000);

        return 10;

    }))

    // собираем все результаты
    .collectList()

    .map(list -> list.stream()
                     .mapToInt(Integer::intValue)
                     .sum())

    .subscribe(System.out::println);

package main

import "fmt"

func worker(ch chan int) {

    // имитация работы
    ch <- 10
}

func main() {

    results := make(chan int)

    for i := 0; i < 3; i++ {

        // запускаем горутину
        go worker(results)
    }

    sum := 0

    for i := 0; i < 3; i++ {

        // получаем результат
        sum += <-results
    }

    fmt.Println(sum)
}
🧠 Ментальная модель Каждая задача возвращает Future — обещание результата. Мы собираем список и ждём каждый результат. Мы описываем поток данных. flatMap создаёт async операции и объединяет их. Каждая goroutine отправляет результат в канал. Канал — это очередь сообщений между задачами.
✔ Плюсы Просто понять. Легко дебажить. Высокая масштабируемость. Нет блокировок потоков. Очень простой concurrency model. Код читается как синхронный.
❌ Минусы Потоки блокируются. Плохо масштабируется. Сложная ментальная модель. Стек вызовов исчезает. Нужно контролировать каналы. Возможны утечки goroutines.
💥 Какая используется модель Thread per task Event loop + async pipeline CSP модель (Communicating Sequential Processes)
🛠️ Что происходит под капотом ThreadPool выполняет задачи. get() блокирует поток. Reactor создаёт неблокирующий pipeline. Runtime Go планирует goroutines.
⚠️ Анти-паттерны Future.get() внутри цикла без пулов. Блокировка внутри reactive цепочки. Чтение канала без контроля количества сообщений.
🚀 Когда использовать Небольшие batch задачи. High-load API. Network сервисы и пайплайны.
🔥 Комментарий Blocking Java хороша для простоты. Reactive выигрывает при 10k+ запросов. Go даёт лучшую читаемость concurrency.

🟨 Java Blocking — никогда не вызывай future.get() внутри большого цикла без пула потоков. Это превращает async код обратно в синхронный.
🟣 Reactor — избегай .block() внутри reactive pipeline. Это разрушает всю модель неблокирующего исполнения.
🔵 Go — всегда контролируй количество чтений из канала. Если отправили 5 сообщений — нужно 5 чтений.

API aggregator (Backend For Frontend) — собираем данные из нескольких сервисов. Очень типичный Join Futures.
Search service — поиск по нескольким индексам.
Microservice gateway — объединение ответов нескольких downstream сервисов.

Semaphore limit

Semaphore limit — 🚦 светофор задач, ограничение количества параллельных операций

💬 Объяснение

Представь мост.

По нему может ехать только 10 машин одновременно.

Если приехала 11-я — она ждёт.

Это делает Semaphore.

🗺️ Схема


LIMIT CONCURRENCY

Tasks -> [ LIMIT 5 ] -> execution


Java Blocking

Semaphore(5)
acquire()
run task
release()


Reactor

flatMap(task, concurrency=5)


Go

buffered channel size=5

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход Semaphore flatMap concurrency buffered channel
🧩 Пример код

Semaphore semaphore = new Semaphore(5);

semaphore.acquire();

try {

    // выполняем задачу
    callService();

} finally {

    // освобождаем слот
    semaphore.release();
}

Flux.range(1,100)

.flatMap(i ->
    callService(i),
    5 // ограничение concurrency
)

.subscribe();

sem := make(chan struct{}, 5)

sem <- struct{}{}

go func() {

    callService()

    <-sem
}()
🧠 Ментальная модель разрешения на выполнение контроль параллелизма в pipeline канал как очередь разрешений
✔ Плюсы точный контроль ресурсов встроено в поток данных очень простая реализация
❌ Минусы можно забыть release() сложнее дебажить можно получить deadlock
💥 Какая используется модель thread synchronization reactive backpressure CSP concurrency
🛠️ Что происходит под капотом Atomic счетчик разрешений Reactor scheduler контролирует поток канал блокирует запись
⚠️ Анти-паттерны Semaphore без finally flatMap без ограничения concurrency buffered channel без контроля закрытия
🚀 Когда использовать rate limiting reactive API worker pool
🔥 Комментарий Semaphore — фундамент thread control flatMap concurrency — реактивный аналог Go делает это максимально элегантно

🟨 Java Blocking — всегда освобождай semaphore в finally, иначе система постепенно зависнет.
🟣 Reactor — всегда указывай concurrency в flatMap если вызываешь внешний сервис.
🔵 Go — buffered channel идеально подходит как semaphore.

Database connection limiting.
External API rate limiting.
Worker pool управление нагрузкой.

Join Service

Join Service — 🎬 дождаться финала всех актёров, запуск набора задач и ожидание завершения

💬 Объяснение

Представь фильм.

Сцена считается завершённой только когда все актёры закончили.

Это и есть Join Service.

🗺️ Схема


TASK GROUP EXECUTION

Task A
Task B
Task C

       ↓

WAIT ALL FINISH


Java

invokeAll()


Reactor

merge()


Go

WaitGroup

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход invokeAll merge WaitGroup
🧩 Пример код

ExecutorService pool =
    Executors.newFixedThreadPool(3);

List<Callable<String>> tasks = List.of(
    () -> "A",
    () -> "B",
    () -> "C"
);

List<Future<String>> results =
    pool.invokeAll(tasks);

Flux.merge(

    serviceA(),
    serviceB(),
    serviceC()

).subscribe();

var wg sync.WaitGroup

wg.Add(3)

go func() {
    defer wg.Done()
}()

go func() {
    defer wg.Done()
}()

go func() {
    defer wg.Done()
}()

wg.Wait()
🧠 Ментальная модель группа задач слияние потоков счётчик активных задач
✔ Плюсы очень простой API естественно вписывается в pipeline минимальный overhead
❌ Минусы блокировка сложная диагностика нужно аккуратно управлять wg.Add()
💥 Какая используется модель thread pool coordination event stream merge goroutine synchronization
🛠️ Что происходит под капотом pool ожидает завершения задач reactor orchestrates events WaitGroup — atomic counter
⚠️ Анти-паттерны invokeAll для long running задач merge без контроля backpressure wg.Add после запуска goroutine
🚀 Когда использовать batch processing event pipelines parallel workers
🔥 Комментарий invokeAll — старый но надёжный инструмент merge — реактивный orchestration WaitGroup — один из самых красивых примитивов Go

🟨 Java Blocking — invokeAll отлично подходит для batch задач.
🟣 Reactor — merge полезен для объединения потоков сервисов.
🔵 Go — WaitGroup это самый простой способ синхронизации горутин.

Fan-out / Fan-in architecture.
Parallel microservice calls.
Batch data processing.

Thread coordination

Thread coordination — 🔔 звонок между потоками, один поток ждёт сигнал другого

💬 Объяснение

Представь кухню ресторана.

Один повар готовит стейк. Другой — гарнир.

Но гарнир можно начинать только после сигнала.

Повар кричит:

“Готово!”

И второй начинает работу.

Это и есть Thread coordination.

Потоки ждут сигнал друг друга.

В разных мирах:

  • 🟨 Java — wait / notify
  • 🟣 Reactor — signal
  • 🔵 Go — channel

🗺️ Схема


THREAD COORDINATION

Thread A ---- work ---- notify ---->
                                   Thread B resumes


Java Blocking

Thread B
  wait()

Thread A
  notify()


Reactive

Publisher -> signal -> Subscriber


Go

goroutine A -> channel -> goroutine B

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход wait / notify signal / reactive stream event channel sync
🧩 Пример код

class Worker {

    private final Object lock = new Object();
    private boolean ready = false;

    public void waitForSignal() throws Exception {

        synchronized (lock) {

            while (!ready) {

                // поток засыпает
                lock.wait();
            }

            System.out.println("Signal received");
        }
    }

    public void sendSignal() {

        synchronized (lock) {

            ready = true;

            // будим ожидающий поток
            lock.notify();
        }
    }
}

Mono<String> signal =
    Mono.just("ready");

signal
    .doOnNext(v -> {

        // событие выступает сигналом
        System.out.println("Signal received");
    })
    .subscribe();

package main

import "fmt"

func main() {

    signal := make(chan bool)

    go func() {

        // поток ждёт сигнал
        <-signal

        fmt.Println("Signal received")
    }()

    // отправляем сигнал
    signal <- true
}
🧠 Ментальная модель Поток может уснуть и ждать сигнала. Другой поток будит его через notify(). В reactive мире сигнал — это событие в stream. Канал — это труба сообщений между горутинами.
✔ Плюсы Очень низкий уровень контроля. Отсутствие блокировок. Очень чистая модель сигналов.
❌ Минусы Легко получить deadlock. Трудно понять поток событий. Можно случайно заблокировать канал.
💥 Какая используется модель monitor synchronization event driven architecture CSP messaging
🛠️ Что происходит "под капотом" JVM переводит поток в WAITING состояние. Reactor распространяет событие через pipeline. Go runtime паркует goroutine.
⚠️ Анти-паттерны wait() без цикла while. блокирующие вызовы внутри reactive chain. канал без читающей стороны.
🚀 Когда использовать низкоуровневые concurrency библиотеки. event processing pipeline. между goroutines.
🔥 Комментарий wait/notify — очень мощный, но опасный инструмент. reactive events заменяют ручные сигналы. каналы Go делают coordination естественным.

🟨 Java Blocking — всегда используй wait() внутри while. Это защита от ложных пробуждений JVM.
🟣 Reactor — думай о событиях как о сигналах между частями pipeline.
🔵 Go — никогда не отправляй в канал, если никто не читает.

Producer-consumer pipeline — один поток производит данные, другой ждёт сигнал готовности.
Event-driven системы — реакция на события вместо polling.
Синхронизация этапов обработки данных.

Race control

Race control — 🏁 один финишный коридор, контроль порядка доступа к данным

💬 Объяснение

Представь банкомат.

К нему может подойти только один человек.

Если двое попытаются одновременно — начнётся хаос.

Это называется race condition.

Поэтому нужен контроль доступа.

В разных мирах:

  • 🟨 Java — synchronized
  • 🟣 Reactor — concatMap (гарантирует порядок)
  • 🔵 Go — mutex

🗺️ Схема


RACE CONTROL

Task A \
        -> critical section
Task B /

only one allowed


Java

synchronized block


Reactive

concatMap -> sequential processing


Go

mutex lock

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход synchronized concatMap mutex
🧩 Пример код

class Counter {

    private int value = 0;

    public synchronized void increment() {

        // только один поток
        // может войти сюда

        value++;
    }
}

Flux.range(1,5)

.concatMap(i -> {

    // задачи выполняются
    // строго по очереди

    return Mono.just(i);
})

.subscribe();

package main

import (
    "sync"
)

var mutex sync.Mutex
var counter int

func increment() {

    mutex.Lock()

    counter++

    mutex.Unlock()
}
🧠 Ментальная модель монитор объекта очередь задач замок доступа
✔ Плюсы простая синхронизация гарантирует порядок событий очень быстрый mutex
❌ Минусы блокирует поток ограничивает параллелизм можно забыть Unlock()
💥 Какая используется модель monitor lock sequential stream mutual exclusion
🛠️ Что происходит "под капотом" JVM использует monitor lock Reactor выполняет задачи последовательно Go runtime использует spinlock + park
⚠️ Анти-паттерны синхронизация большого блока кода concatMap для heavy CPU задач mutex вокруг медленных операций
🚀 Когда использовать shared state event ordering shared memory protection
🔥 Комментарий synchronized — основа Java concurrency concatMap — элегантный контроль порядка mutex в Go невероятно быстрый

🟨 Java Blocking — держи synchronized блоки максимально короткими.
🟣 Reactor — concatMap полезен когда важен порядок событий.
🔵 Go — используй defer mutex.Unlock() чтобы избежать забытых unlock.

Обновление shared кеша.
Последовательная обработка финансовых операций.
Очереди событий.

Fork-Join

Fork-Join — 🌳 раздели дерево задачи, параллельно реши ветки и собери результат

💬 Объяснение

Представь огромную задачу.

Например — подсчитать сумму миллионов чисел.

Один поток будет считать очень долго.

Поэтому мы:

  • делим задачу
  • решаем части параллельно
  • объединяем результат

Это и есть Fork-Join.

🗺️ Схема


FORK JOIN

        task
       /   \
    task   task
    / \     / \
   a  b    c   d

join results

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход ForkJoinPool parallel Flux worker pool
🧩 Пример код

ForkJoinPool pool = new ForkJoinPool();

int result =
    pool.submit(() ->
        IntStream.range(0,1000)
        .parallel()
        .sum()
    ).get();

Flux.range(1,1000)

.parallel()

.runOn(Schedulers.parallel())

.reduce(Integer::sum)

.subscribe(System.out::println);

jobs := make(chan int, 100)

for w := 0; w < 4; w++ {

    go worker(jobs)
}

for i := 0; i < 100; i++ {

    jobs <- i
}
🧠 Ментальная модель divide and conquer parallel stream pipeline worker pool
✔ Плюсы эффективное использование CPU встроенный параллелизм очень гибкий worker pool
❌ Минусы сложно контролировать сложно дебажить ручное управление
💥 Какая используется модель work stealing reactive parallel scheduler goroutine worker pool
🛠️ Что происходит "под капотом" ForkJoinPool крадёт задачи между потоками Reactor распределяет задачи по scheduler goroutines планируются runtime
⚠️ Анти-паттерны блокирующие операции внутри forkjoin parallel Flux для IO слишком много worker
🚀 Когда использовать CPU heavy задачи data pipelines distributed jobs
🔥 Комментарий ForkJoin — сердце parallel streams Reactor делает параллелизм декларативным worker pool — классический Go подход

🟨 Java Blocking — ForkJoinPool идеально подходит для CPU-heavy задач.
🟣 Reactor — parallel() используй только для CPU задач, не IO.
🔵 Go — worker pool лучше ограничивать количеством CPU.

Big data обработка.
Parallel computation.
Distributed batch processing.

Ordered join

Ordered join — 📦 собрать посылки по номеру, параллельные задачи → результаты в исходном порядке

💬 Объяснение

Представь конвейер доставки.

Пять курьеров развозят посылки одновременно. Но клиент должен получить их в правильном порядке.

Даже если посылка №3 приехала раньше №2 — её нужно подождать.

Это и есть Ordered Join.

Мы запускаем задачи параллельно, но собираем результат в исходном порядке.

В разных технологиях:

  • 🟨 Java — список Future
  • 🟣 Reactor — concat
  • 🔵 Go — упорядоченный канал

🗺️ Схема


ORDERED JOIN

Tasks executed in parallel:

Task1 ----\
Task2 -----\ 
Task3 -------> execution
Task4 -----/
Task5 ----/

BUT results must be returned in order

Result1
Result2
Result3
Result4
Result5


Java Blocking

Future1.get()
Future2.get()
Future3.get()


Reactive

Flux.concat()

ensures order


Go

channel with ordered aggregation

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход List<Future> concat ordered channel
🧩 Пример код

ExecutorService pool =
    Executors.newFixedThreadPool(3);

// список задач
List<Callable<Integer>> tasks = List.of(
    () -> 1,
    () -> 2,
    () -> 3
);

// запускаем все задачи
List<Future<Integer>> futures =
    pool.invokeAll(tasks);

// получаем результаты строго по порядку
for (Future<Integer> f : futures) {

    // get() блокирует поток
    // пока результат не готов
    System.out.println(f.get());
}

Flux<Integer> flux1 = Mono.just(1).flux();
Flux<Integer> flux2 = Mono.just(2).flux();
Flux<Integer> flux3 = Mono.just(3).flux();

// concat гарантирует
// сохранение порядка

Flux.concat(flux1, flux2, flux3)
    .subscribe(System.out::println);

package main

import "fmt"

func worker(id int, ch chan int) {

    // отправляем результат
    ch <- id
}

func main() {

    ch := make(chan int)

    for i := 1; i <= 3; i++ {

        go worker(i, ch)
    }

    // читаем результаты
    // и можем упорядочить их
    for i := 0; i < 3; i++ {

        fmt.Println(<-ch)
    }
}
🧠 Ментальная модель Список Future выступает как очередь результатов. Мы читаем их в исходном порядке задач. concat соединяет потоки данных последовательно, сохраняя порядок событий. Канал служит буфером сообщений между goroutines.
✔ Плюсы этой реализации Очень простой способ сохранить порядок результатов. Полностью неблокирующая обработка данных. Гибкость управления порядком сообщений.
❌ Минусы get() блокирует поток. concat снижает параллелизм. нужно самостоятельно контролировать порядок.
💥 Какая используется модель thread coordination reactive sequencing message passing
🛠️ Что происходит "под капотом" ThreadPool выполняет задачи параллельно. Future хранит результат. Reactor строит pipeline обработки событий. Go runtime планирует goroutines и каналы.
⚠️ Анти-паттерны Future.get() внутри тяжёлого цикла. concat для долгих IO операций. чтение канала без контроля количества сообщений.
🚀 Когда использовать batch задачи. reactive pipelines. stream processing.
🔥 Комментарий от тебя Ordered join — важен для deterministic систем. concat — самый простой способ гарантировать порядок. Go даёт гибкость, но требует аккуратности.

🟨 Java Blocking — список Future автоматически сохраняет порядок задач, поэтому безопасно использовать invokeAll.
🟣 Reactor — concat обеспечивает строгий порядок событий, но снижает параллелизм pipeline.
🔵 Go — для упорядочивания результатов лучше использовать индексированные структуры.

Aggregation API — нужно вернуть данные сервисов строго в том же порядке.
Streaming pipelines — обработка событий в фиксированной последовательности.
Batch ETL — обработка файлов, где порядок важен.

Barrier sync

Barrier sync — 🚧 общая линия старта, все потоки ждут друг друга

💬 Объяснение

Представь марафон.

Бегуны должны стартовать одновременно.

Даже если кто-то пришёл раньше — он ждёт остальных.

Когда все готовы — старт.

Это и есть Barrier Sync.

🗺️ Схема


BARRIER SYNC

Thread A ----\
Thread B ----- WAIT -----> continue together
Thread C ----/


Java

CyclicBarrier.await()


Reactive

zip()


Go

WaitGroup

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход CyclicBarrier zip WaitGroup
🧩 Пример код

CyclicBarrier barrier =
    new CyclicBarrier(3);

Runnable worker = () -> {

    try {

        System.out.println("Ready");

        // поток ждёт остальных
        barrier.await();

        System.out.println("Start together");

    } catch(Exception e){}
};

Mono.zip(

    serviceA(),
    serviceB(),
    serviceC()

).subscribe(tuple -> {

    System.out.println("All results ready");

});

var wg sync.WaitGroup

wg.Add(3)

go func() {
    defer wg.Done()
}()

go func() {
    defer wg.Done()
}()

go func() {
    defer wg.Done()
}()

// ждём всех
wg.Wait()
🧠 Ментальная модель точка синхронизации потоков ожидание всех потоков данных счётчик активных задач
✔ Плюсы этой реализации точная синхронизация естественно для stream processing очень простой API
❌ Минусы может вызвать deadlock ожидание медленного источника нужно контролировать Add()
💥 Какая используется модель thread barrier event synchronization task counter
🛠️ Что происходит "под капотом" барьер блокирует потоки zip ждёт элементы всех потоков WaitGroup использует atomic counter
⚠️ Анти-паттерны неверное количество участников zip для бесконечных потоков wg.Add после запуска goroutine
🚀 Когда использовать parallel algorithms aggregation pipelines task orchestration
🔥 Комментарий от тебя Barrier — фундамент синхронизации zip — элегантная реактивная синхронизация WaitGroup — одна из лучших идей Go

🟨 Java Blocking — CyclicBarrier полезен в parallel algorithms.
🟣 Reactor — zip отлично подходит для объединения результатов сервисов.
🔵 Go — всегда вызывай wg.Add() до запуска goroutines.

Microservice orchestration — ожидание ответов нескольких сервисов.
Parallel computation — синхронизация этапов алгоритма.
Distributed pipelines — координация workers.

Conditional wait

Conditional wait — 🎣 ждать событие по условию, поток просыпается только когда условие выполнено

💬 Объяснение

Многие новички делают так:

sleep(1 second) → проверить условие → sleep снова.

Это называется polling.

Гораздо лучше — ждать событие реактивно.

То есть поток просыпается только тогда, когда условие выполнено.

Это называется Conditional Wait.

🗺️ Схема


CONDITIONAL WAIT

condition false -> wait

condition true -> resume


Java

Condition.await()


Reactive

filter + retry


Go

select

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход Condition filter + retry select
🧩 Пример код

Lock lock = new ReentrantLock();

Condition ready =
    lock.newCondition();

lock.lock();

try {

    // поток ждёт условия
    ready.await();

} finally {

    lock.unlock();
}

Flux.interval(Duration.ofSeconds(1))

.filter(v -> v > 5)

.next()

.subscribe(v ->
    System.out.println("Condition met")
);

select {

case msg := <-channel:

    fmt.Println(msg)

case <-time.After(time.Second):

    fmt.Println("timeout")

}
🧠 Ментальная модель ожидание сигнала реакция на поток событий ожидание одного из событий
✔ Плюсы этой реализации точный контроль синхронизации идеально для событийных систем очень гибкий механизм
❌ Минусы сложно использовать правильно сложно дебажить pipeline select может усложнять код
💥 Какая используется модель condition variable event filtering event multiplexing
🛠️ Что происходит "под капотом" поток переходит в WAITING состояние Reactor фильтрует поток событий Go runtime ожидает события каналов
⚠️ Анти-паттерны sleep polling блокирующие операции в pipeline select без default
🚀 Когда использовать thread coordination event processing network servers
🔥 Комментарий от тебя Condition — продвинутая альтернатива wait/notify Reactive pipelines идеально подходят для условий select — один из самых мощных инструментов Go

🟨 Java Blocking — Condition безопаснее wait/notify и даёт более точный контроль.
🟣 Reactor — filter + retry позволяет ждать событие без блокировки.
🔵 Go — select идеально подходит для сетевых серверов.

Event-driven системы.
Network servers.
Reactive pipelines обработки данных.

RW lock

RW lock — 📚 читальный зал библиотеки, много читателей но только один писатель

💬 Объяснение

Представь библиотеку.

Десятки людей могут читать книгу одновременно.

Но редактировать книгу может только один человек.

Это оптимизация доступа:

  • много readers
  • один writer

Это называется Read-Write Lock.

В Java есть специальный класс:

ReentrantReadWriteLock

В Go есть аналог:

sync.RWMutex

Reactive мире обычно избегает shared state, поэтому отдельного аналога нет.

🗺️ Схема


READ WRITE LOCK

Readers:
R1
R2
R3
R4

all allowed simultaneously

Writer:

W1

exclusive access


Java

readLock() / writeLock()


Reactive

avoid shared state


Go

RWMutex.RLock()
RWMutex.Lock()

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход ReentrantReadWriteLock обычно избегают shared state RWMutex
🧩 Пример код

import java.util.concurrent.locks.*;

class Cache {

    private final ReentrantReadWriteLock lock =
        new ReentrantReadWriteLock();

    private int value = 0;

    public int read() {

        lock.readLock().lock();

        try {

            return value;

        } finally {

            lock.readLock().unlock();
        }
    }

    public void write(int v) {

        lock.writeLock().lock();

        try {

            value = v;

        } finally {

            lock.writeLock().unlock();
        }
    }
}

// reactive обычно избегает shared state

Flux.range(1,10)

.map(v -> v * 2)

.subscribe();

package main

import "sync"

var mutex sync.RWMutex
var value int

func read() int {

    mutex.RLock()
    defer mutex.RUnlock()

    return value
}

func write(v int) {

    mutex.Lock()
    defer mutex.Unlock()

    value = v
}
🧠 Ментальная модель Shared data с оптимизированным доступом. State избегается, поток данных immutable. Read lock позволяет нескольким читателям.
✔ Плюсы этой реализации Высокая производительность при read-heavy нагрузке. Нет блокировок. Очень лёгкий lock.
❌ Минусы Сложнее чем synchronized. Не подходит для shared mutable state. Можно случайно заблокировать writer.
💥 Какая используется модель read-write synchronization stateless stream mutex coordination
🛠️ Что происходит "под капотом" JVM управляет очередью readers/writers. Reactor использует immutable данные. RWMutex использует атомарные операции.
⚠️ Анти-паттерны использование RWLock при write-heavy нагрузке. shared mutable state в reactive pipeline. долгие операции внутри lock.
🚀 Когда использовать кеши и read-heavy структуры. data pipelines. high-performance сервисы.
🔥 Комментарий от тебя RWLock — одна из лучших оптимизаций concurrency. Reactive решает проблему архитектурно. Go RWMutex невероятно эффективен.

🟨 Java Blocking — RWLock идеально подходит для read-heavy workloads (например кеши).
🟣 Reactive — лучше полностью избегать shared mutable state.
🔵 Go — RWMutex быстрее обычного Mutex при большом количестве readers.

In-memory cache.
Configuration storage.
Read-heavy microservices.

Phaser sync

Phaser sync — 🏗️ стройка по этапам, участники синхронизируются на каждой фазе

💬 Объяснение

Представь стройку дома.

Есть этапы:

  • фундамент
  • стены
  • крыша

Нельзя строить крышу пока не закончены стены.

Phaser — это барьер для нескольких фаз выполнения.

🗺️ Схема


PHASER

Phase 1 -> barrier
Phase 2 -> barrier
Phase 3 -> barrier

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход Phaser обычно orchestrated pipeline custom sync
🧩 Пример код

Phaser phaser = new Phaser(3);

Runnable worker = () -> {

    System.out.println("Phase 1 done");

    phaser.arriveAndAwaitAdvance();

    System.out.println("Phase 2 done");
};

Flux.just("phase1","phase2")

.concatMap(stage ->
    process(stage)
)
.subscribe();

// пример кастомной синхронизации
// через каналы и waitgroup
🧠 Ментальная модель много этапов синхронизации pipeline стадий ручная координация
✔ Плюсы этой реализации гибкая синхронизация декларативный pipeline полный контроль
❌ Минусы сложный API нет прямого аналога нужно писать самому
💥 Какая используется модель phase barrier stream stage pipeline custom coordination
🛠️ Что происходит "под капотом" Phaser считает участников каждой фазы. Reactor выполняет pipeline этап за этапом. goroutines синхронизируются каналами.
⚠️ Анти-паттерны слишком сложные phaser структуры. блокирующие вызовы. overengineering.
🚀 Когда использовать multi-stage algorithms. data pipelines. complex orchestration.
🔥 Комментарий от тебя Phaser — мощный, но редко используемый инструмент. Reactive pipelines часто заменяют phaser. Go обычно делает это проще.

🟨 Java Blocking — Phaser лучше CyclicBarrier когда фаз больше одной.
🟣 Reactive — pipeline этапы естественно заменяют phaser.
🔵 Go — иногда проще использовать несколько WaitGroup.

Game engines.
Simulation systems.
multi-phase algorithms.

Latch reuse

Latch reuse — 🔄 много стартов гонки, синхронизация используется снова и снова

💬 Объяснение

CountDownLatch — это одноразовый барьер.

Когда счётчик достиг нуля — всё.

Его нельзя использовать снова.

Поэтому для повторного использования нужно:

  • создавать новый latch
  • пересоздавать pipeline
  • сбрасывать канал

🗺️ Схема


LATCH REUSE

Round 1 -> latch
Round 2 -> new latch
Round 3 -> new latch

📊 Сводная таблица

Что сравниваем 🟨 Java Блокирующий подход 🟣 Java Реактивный подход 🔵 Go
🎯 Подход CountDownLatch recreate pipeline reset channel
🧩 Пример код

CountDownLatch latch =
    new CountDownLatch(3);

Runnable worker = () -> {

    latch.countDown();
};

latch.await();

Flux.range(1,3)

.flatMap(this::process)

.collectList()

.subscribe();

done := make(chan bool)

go func() {

    done <- true

}()

<-done
🧠 Ментальная модель одноразовый барьер одноразовый pipeline одноразовый канал
✔ Плюсы этой реализации очень простой механизм нативный поток событий минимальный код
❌ Минусы нельзя reuse pipeline нужно пересоздавать канал нужно пересоздавать
💥 Какая используется модель countdown barrier reactive flow channel signal
🛠️ Что происходит "под капотом" Atomic счетчик уменьшается до нуля. Reactive pipeline завершает поток. канал передаёт сигнал завершения.
⚠️ Анти-паттерны ожидание latch внутри thread pool. блокировка reactive pipeline. утечки каналов.
🚀 Когда использовать test orchestration. async pipelines. signal completion.
🔥 Комментарий от тебя Latch — очень простой, но мощный инструмент. Reactive pipeline обычно заменяет latch. Go channels отлично подходят для сигналов.

🟨 Java Blocking — CountDownLatch хорош для одноразовой синхронизации.
🟣 Reactive — pipeline создаётся заново для каждого потока данных.
🔵 Go — канал часто выступает сигналом завершения.

Integration tests orchestration.
Batch processing.
Signal completion workflows.

ВЫВОД

Все три мира решают одну проблему:

координацию параллельных задач.

Но делают это по-разному.

  • 🟨 Java Blocking — богатый набор синхронизационных примитивов
  • 🟣 Reactive Java — избегает shared state и использует pipelines
  • 🔵 Go — минимализм: mutex + channels

Простое правило выбора:

  • Shared state и высокая конкуренция → RWLock
  • Много этапов алгоритма → Phaser
  • Ожидание группы задач → Latch

Senior уровень — это не знание API.

Это понимание того, как задачи координируются.

Всего лайков:0

Оставить комментарий

Мой канал в социальных сетях
Отправляя email, вы принимаете условия политики конфиденциальности

Полезные статьи:

Низкоуровневые механизмы | Go ↔ Java
В этой статье мы разберем ключевые низкоуровневые механизмы Go, сравнивая их с аналогичными инструментами в Java. Статья предназначена для Java-разработчиков, которые хотят глубже понять Go, а также д...
Асинхронность и реактивность в Java: CompletableFuture, Flow и Virtual Threads
В современном Java-разработке есть три основных подхода к асинхронности и параллельности: CompletableFuture — для одиночных асинхронных задач. Flow / Reactive Streams — для потоков данных с контролем...
Понимаем многопоточность в Java через коллекции и атомики
1️⃣ HashMap / TreeMap / TreeSet (не потокобезопасные) HashMap: Структура: массив бакетов + связные списки / деревья (для коллизий). Под капотом: при put/remove происходит модификация массива бакетов ...

Новые статьи:

Конкурентность — это не про «запустить много потоков». Это про договорённости между ними. Представь кухню ресторана: — повара (потоки / горутины) — заказы (задачи) — и главный вопрос: как они коорди...
История начинается не с академической теории, а с типичной production-проблемы. Представьте сервис: 48 CPU 300+ потоков нагрузка 200k операций в секунду много shared state Команда использует обы...
Когда HashMap начинает убивать продакшн: инженерная история ConcurrentHashMap
Представьте обычный продакшн-сервис. 32 CPU сотни потоков кэш конфигурации / сессий / rate limits десятки тысяч операций в секунду И где-то внутри — обычный Map. Сначала всё выглядит безобидно. Map&...
Fullscreen image