Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Heuristic:Apache Beam Lock Contention Batching

From Leeroopedia



Knowledge Sources
Domains Optimization, Concurrency
Last Updated 2026-02-09 04:00 GMT

Overview

Concurrency optimization using lock-free ConcurrentLinkedQueue with batched drain to reduce monitor contention in high-throughput work executors.

Description

In the Dataflow streaming worker, the `BoundedQueueExecutor` manages outstanding work items with counters protected by a Monitor lock. The GetWork thread holds this monitor to check capacity, creating contention when many worker threads simultaneously decrement counters after completing work. The batched decrement pattern uses a `ConcurrentLinkedQueue` for lock-free enqueueing, then a single thread drains the entire queue under a separate drain lock, batching multiple decrements into a single monitor acquisition. An `AtomicBoolean` flag ensures at most two threads ever contend on the drain lock.

Usage

Apply this heuristic when you have a hot monitor where many threads need to update shared counters, but only one thread (the producer/scheduler) needs to read them. It is particularly effective when the contention bottleneck is on the reader side (e.g., a scheduling loop that must not be blocked).

The Insight (Rule of Thumb)

  • Action: Replace direct `synchronized` counter updates with a `ConcurrentLinkedQueue` + batched drain pattern.
  • Value: Use `AtomicBoolean` as a "batch pending" flag to limit drain-lock contention to at most 2 threads.
  • Trade-off: Counter values are slightly stale (not updated atomically with work completion), but the scheduling thread sees a consistent snapshot when it acquires the monitor.
  • Compatibility: Requires that slightly-delayed counter updates are acceptable (eventual consistency within microseconds).

Reasoning

When N worker threads all compete to enter a `synchronized` block to decrement a counter, the scheduling thread (which needs that same lock to check capacity) can be starved. By routing decrements through a lock-free queue and having a single drain thread batch them, the monitor is held for a brief burst instead of N separate acquisitions. The `AtomicBoolean` flag (`isDecrementBatchPending`) acts as a coalescing gate: the first thread to set it becomes the drainer, and subsequent threads simply enqueue and return. This reduces lock contention from O(N) monitor entries to O(1) batched entries per scheduling cycle.

This pattern was specifically introduced in PR #37528 to address lock notify contention observed in production Dataflow streaming workers.

Code Evidence

Batched decrement pattern from `BoundedQueueExecutor.java:245-284`:

private void decrementCounters(long workBytes) {
    // All threads queue decrements and one thread grabs the monitor and updates
    // counters. We do this to reduce contention on monitor which is locked by
    // GetWork thread
    decrementQueue.add(workBytes);
    boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true);
    if (submittedToExistingBatch) {
      // There is already a thread about to drain the decrement queue
      // Current thread does not need to drain.
      return;
    }
    synchronized (decrementQueueDrainLock) {
      // By setting false here, we may allow another decrement to claim
      // submission of the next batch and start waiting on the
      // decrementQueueDrainLock.
      //
      // However this prevents races that would leave decrements in the
      // queue and unclaimed and we are ensured there is at most one
      // additional thread blocked.
      isDecrementBatchPending.set(false);
      long bytesToDecrement = 0;
      int elementsToDecrement = 0;
      while (true) {
        Long pollResult = decrementQueue.poll();
        if (pollResult == null) {
          break;
        }
        bytesToDecrement += pollResult;
        ++elementsToDecrement;
      }
      if (elementsToDecrement == 0) {
        return;
      }
      monitor.enter();
      elementsOutstanding -= elementsToDecrement;
      bytesOutstanding -= bytesToDecrement;
      monitor.leave();
    }
}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment