Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam StreamingDataflowWorker Start

From Leeroopedia


Field Value
Implementation Name StreamingDataflowWorker Start
Overview Concrete implementation for starting all background subsystems of a streaming Dataflow worker, including config refresh, memory monitoring, Windmill streams, metric reporting, and active work heartbeats.
Module runners/google-cloud-dataflow-java/worker
Repository apache/beam
Related Principle Principle:Apache_Beam_Heartbeat_and_Refresh
last_updated 2026-02-09 04:00 GMT

Overview

The StreamingDataflowWorker.start() method is the concrete entry point for activating all background subsystems of a streaming Dataflow worker. After the worker is fully constructed by fromOptions(), the start() method transitions it from a constructed state to a running state by starting the config fetcher, memory monitor, Windmill harness, execution state sampler, status reporter, active work refresher, and connectivity type observer. This method is the activation switch for the entire worker lifecycle.

Code Reference

Source Location

File Lines Description
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java L1065-1084 start() method
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java L1086-1089 startStatusPages() -- debug status page server
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java -- ActiveWorkRefresher -- heartbeat sender for in-progress work
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java -- StreamingWorkerStatusReporter -- periodic metric reporting

Signature

start() Method:

public void start() {
    running.set(true);

    // Step 1: Start periodic configuration refresh.
    // For StreamingEngine: blocks until initial global config is received,
    // then schedules periodic refresh at globalConfigRefreshPeriodMillis.
    configFetcher.start();

    // Step 2: Start background memory pressure monitoring.
    // Monitors JVM heap usage and GC activity.
    // Throttles work acquisition via waitForResources() when memory is low.
    memoryMonitor.start();

    // Step 3: Activate Windmill streams.
    // For FanOutStreamingEngineWorkerHarness: starts GetWorkerMetadataStream
    //   which triggers connections to Windmill backends.
    // For SingleSourceWorkerHarness: starts GetWork/GetData/CommitWork streams.
    streamingWorkerHarness.get().start();

    // Step 4: Start execution state sampling.
    // Periodically samples which step each thread is executing
    // for time-based profiling.
    sampler.start();

    // Step 5: Start periodic counter/metric reporting to Dataflow service.
    // Sends counter updates, failure reports, and per-worker metrics.
    workerStatusReporter.start();

    // Step 6: Start heartbeating for in-progress work items.
    // Prevents Windmill from reclaiming work items during processing.
    // Runs at activeWorkRefreshPeriodMillis interval.
    activeWorkRefresher.start();

    // Step 7: Register observer for connectivity type changes.
    // Enables dynamic switching between DirectPath and CloudPath
    // without worker restart.
    configFetcher
        .getGlobalConfigHandle()
        .registerConfigObserver(
            streamingGlobalConfig -> {
                ConnectivityType connectivityType =
                    streamingGlobalConfig.userWorkerJobSettings()
                        .getConnectivityType();
                if (connectivityType
                        != ConnectivityType.CONNECTIVITY_TYPE_DEFAULT) {
                    harnessSwitchExecutor.execute(
                        () -> switchStreamingWorkerHarness(connectivityType));
                }
            });
}

Import

import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker;
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter;
import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;

I/O Contract

Inputs

Input Type Description
Constructed Worker StreamingDataflowWorker (this) A fully initialized worker instance from fromOptions() with all subsystems wired but not yet active.

Outputs

Output Type Description
Periodic Heartbeats via ActiveWorkRefresher Heartbeats sent to Windmill for all in-progress work items at activeWorkRefreshPeriodMillis intervals.
Status Reports via StreamingWorkerStatusReporter Counter updates, failure reports, and per-worker metrics sent to the Dataflow service at windmillHarnessUpdateReportingPeriod intervals.
Config Refreshes via ComputationConfig.Fetcher Periodic global configuration fetches from the Dataflow service at globalConfigRefreshPeriodMillis intervals.
Memory Monitoring via BackgroundMemoryMonitor Continuous JVM memory pressure monitoring with throttling when memory is constrained.
Active Windmill Streams via StreamingWorkerHarness GetWork, GetData, and CommitWork gRPC streams to Windmill backends.

Subsystem Startup Details

1. configFetcher.start()

For StreamingEngineComputationConfigFetcher:

@Override
public void start() {
    // Blocks until initial global config is received (retries every 5 seconds)
    fetchInitialPipelineGlobalConfig();
    // Schedules periodic refresh on a ScheduledExecutorService
    schedulePeriodicGlobalConfigRequests();
}

This is a blocking call at startup. The worker will not proceed until the initial global configuration is received from the Dataflow service. This ensures Windmill endpoints and operational limits are known before streams are opened.

2. memoryMonitor.start()

Starts a background thread that:

  • Monitors Runtime.getRuntime().freeMemory() and GC activity.
  • When memory is constrained, blocks callers of waitForResources().
  • Can trigger heap dumps on severe memory pressure.

3. streamingWorkerHarness.get().start()

For FanOutStreamingEngineWorkerHarness:

@Override
public synchronized void start() {
    Preconditions.checkState(!started);
    getWorkerMetadataStream =
        streamFactory.createGetWorkerMetadataStream(
            dispatcherClient::getWindmillMetadataServiceStubBlocking,
            this::consumeWorkerMetadata);
    getWorkerMetadataStream.start();
    started = true;
}

This opens the metadata stream to the Windmill dispatcher, which triggers endpoint assignments. As endpoints are received, the harness opens GetWork/GetData/CommitWork streams to each backend.

4. workerStatusReporter.start()

Starts periodic reporting that:

  • Collects counters from all active stages.
  • Uses CounterShortIdCache to optimize bandwidth.
  • Reports failure stacktraces (up to 1000 per report).
  • Sends per-worker metrics at a separate interval.

5. activeWorkRefresher.start()

Starts periodic heartbeating that:

  • Iterates over all computations and their active work items.
  • Sends heartbeats via ThrottlingGetDataMetricTracker.trackHeartbeats().
  • Detects stuck commits (commits waiting longer than stuckCommitDurationMillis).

6. Connectivity Type Observer

Registers a callback on the global config handle that:

  • Monitors the ConnectivityType field in updated configurations.
  • Triggers harness switching on a dedicated single-thread executor (harnessSwitchExecutor).
  • Can switch between FanOutStreamingEngineWorkerHarness (DirectPath) and SingleSourceWorkerHarness (CloudPath).

Usage Examples

Complete Worker Bootstrap Sequence:

// Phase 1: Initialize global state (logging, SSL, etc.)
DataflowWorkerHarnessHelper.initializeLogging(
    DataflowStreamingWorkerHarness.class);
DataflowWorkerHarnessOptions options =
    DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
        DataflowStreamingWorkerHarness.class,
        DataflowWorkerHarnessOptions.class);

// Phase 2: Construct the worker with all subsystems
StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options);

// Phase 3: Start all subsystems
worker.start();

// Phase 4 (optional): Start debug status pages
// worker.startStatusPages();  -- called separately for lighter-weight testing

// Worker is now running and processing work items.
// It will continue until shutdown() is called.

Startup Order Dependencies:

configFetcher.start()
  |-- MUST complete before harness.start() (needs Windmill endpoints)
  |
memoryMonitor.start()
  |-- Independent, but should start before harness (needed for waitForResources)
  |
streamingWorkerHarness.start()
  |-- Depends on configFetcher having endpoints
  |-- Opens Windmill streams (work starts flowing)
  |
sampler.start()
  |-- Independent, can start anytime
  |
workerStatusReporter.start()
  |-- Should start after harness (reports on active work)
  |
activeWorkRefresher.start()
  |-- MUST start after harness (heartbeats active work items)
  |
registerConfigObserver
  |-- MUST register after harness is set (observer may switch harness)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment