Implementation:Apache Beam StreamingDataflowWorker Start
| Field | Value |
|---|---|
| Implementation Name | StreamingDataflowWorker Start |
| Overview | Concrete implementation for starting all background subsystems of a streaming Dataflow worker, including config refresh, memory monitoring, Windmill streams, metric reporting, and active work heartbeats. |
| Module | runners/google-cloud-dataflow-java/worker |
| Repository | apache/beam |
| Related Principle | Principle:Apache_Beam_Heartbeat_and_Refresh |
| last_updated | 2026-02-09 04:00 GMT |
Overview
The StreamingDataflowWorker.start() method is the concrete entry point for activating all background subsystems of a streaming Dataflow worker. After the worker is fully constructed by fromOptions(), the start() method transitions it from a constructed state to a running state by starting the config fetcher, memory monitor, Windmill harness, execution state sampler, status reporter, active work refresher, and connectivity type observer. This method is the activation switch for the entire worker lifecycle.
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java |
L1065-1084 | start() method
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java |
L1086-1089 | startStatusPages() -- debug status page server
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresher.java |
-- | ActiveWorkRefresher -- heartbeat sender for in-progress work
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/StreamingWorkerStatusReporter.java |
-- | StreamingWorkerStatusReporter -- periodic metric reporting
|
Signature
start() Method:
public void start() {
running.set(true);
// Step 1: Start periodic configuration refresh.
// For StreamingEngine: blocks until initial global config is received,
// then schedules periodic refresh at globalConfigRefreshPeriodMillis.
configFetcher.start();
// Step 2: Start background memory pressure monitoring.
// Monitors JVM heap usage and GC activity.
// Throttles work acquisition via waitForResources() when memory is low.
memoryMonitor.start();
// Step 3: Activate Windmill streams.
// For FanOutStreamingEngineWorkerHarness: starts GetWorkerMetadataStream
// which triggers connections to Windmill backends.
// For SingleSourceWorkerHarness: starts GetWork/GetData/CommitWork streams.
streamingWorkerHarness.get().start();
// Step 4: Start execution state sampling.
// Periodically samples which step each thread is executing
// for time-based profiling.
sampler.start();
// Step 5: Start periodic counter/metric reporting to Dataflow service.
// Sends counter updates, failure reports, and per-worker metrics.
workerStatusReporter.start();
// Step 6: Start heartbeating for in-progress work items.
// Prevents Windmill from reclaiming work items during processing.
// Runs at activeWorkRefreshPeriodMillis interval.
activeWorkRefresher.start();
// Step 7: Register observer for connectivity type changes.
// Enables dynamic switching between DirectPath and CloudPath
// without worker restart.
configFetcher
.getGlobalConfigHandle()
.registerConfigObserver(
streamingGlobalConfig -> {
ConnectivityType connectivityType =
streamingGlobalConfig.userWorkerJobSettings()
.getConnectivityType();
if (connectivityType
!= ConnectivityType.CONNECTIVITY_TYPE_DEFAULT) {
harnessSwitchExecutor.execute(
() -> switchStreamingWorkerHarness(connectivityType));
}
});
}
Import
import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker;
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.ActiveWorkRefresher;
import org.apache.beam.runners.dataflow.worker.streaming.harness.StreamingWorkerStatusReporter;
import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
| Constructed Worker | StreamingDataflowWorker (this) |
A fully initialized worker instance from fromOptions() with all subsystems wired but not yet active.
|
Outputs
| Output | Type | Description |
|---|---|---|
| Periodic Heartbeats | via ActiveWorkRefresher |
Heartbeats sent to Windmill for all in-progress work items at activeWorkRefreshPeriodMillis intervals.
|
| Status Reports | via StreamingWorkerStatusReporter |
Counter updates, failure reports, and per-worker metrics sent to the Dataflow service at windmillHarnessUpdateReportingPeriod intervals.
|
| Config Refreshes | via ComputationConfig.Fetcher |
Periodic global configuration fetches from the Dataflow service at globalConfigRefreshPeriodMillis intervals.
|
| Memory Monitoring | via BackgroundMemoryMonitor |
Continuous JVM memory pressure monitoring with throttling when memory is constrained. |
| Active Windmill Streams | via StreamingWorkerHarness |
GetWork, GetData, and CommitWork gRPC streams to Windmill backends. |
Subsystem Startup Details
1. configFetcher.start()
For StreamingEngineComputationConfigFetcher:
@Override
public void start() {
// Blocks until initial global config is received (retries every 5 seconds)
fetchInitialPipelineGlobalConfig();
// Schedules periodic refresh on a ScheduledExecutorService
schedulePeriodicGlobalConfigRequests();
}
This is a blocking call at startup. The worker will not proceed until the initial global configuration is received from the Dataflow service. This ensures Windmill endpoints and operational limits are known before streams are opened.
2. memoryMonitor.start()
Starts a background thread that:
- Monitors
Runtime.getRuntime().freeMemory()and GC activity. - When memory is constrained, blocks callers of
waitForResources(). - Can trigger heap dumps on severe memory pressure.
3. streamingWorkerHarness.get().start()
For FanOutStreamingEngineWorkerHarness:
@Override
public synchronized void start() {
Preconditions.checkState(!started);
getWorkerMetadataStream =
streamFactory.createGetWorkerMetadataStream(
dispatcherClient::getWindmillMetadataServiceStubBlocking,
this::consumeWorkerMetadata);
getWorkerMetadataStream.start();
started = true;
}
This opens the metadata stream to the Windmill dispatcher, which triggers endpoint assignments. As endpoints are received, the harness opens GetWork/GetData/CommitWork streams to each backend.
4. workerStatusReporter.start()
Starts periodic reporting that:
- Collects counters from all active stages.
- Uses
CounterShortIdCacheto optimize bandwidth. - Reports failure stacktraces (up to 1000 per report).
- Sends per-worker metrics at a separate interval.
5. activeWorkRefresher.start()
Starts periodic heartbeating that:
- Iterates over all computations and their active work items.
- Sends heartbeats via
ThrottlingGetDataMetricTracker.trackHeartbeats(). - Detects stuck commits (commits waiting longer than
stuckCommitDurationMillis).
6. Connectivity Type Observer
Registers a callback on the global config handle that:
- Monitors the
ConnectivityTypefield in updated configurations. - Triggers harness switching on a dedicated single-thread executor (
harnessSwitchExecutor). - Can switch between
FanOutStreamingEngineWorkerHarness(DirectPath) andSingleSourceWorkerHarness(CloudPath).
Usage Examples
Complete Worker Bootstrap Sequence:
// Phase 1: Initialize global state (logging, SSL, etc.)
DataflowWorkerHarnessHelper.initializeLogging(
DataflowStreamingWorkerHarness.class);
DataflowWorkerHarnessOptions options =
DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
DataflowStreamingWorkerHarness.class,
DataflowWorkerHarnessOptions.class);
// Phase 2: Construct the worker with all subsystems
StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options);
// Phase 3: Start all subsystems
worker.start();
// Phase 4 (optional): Start debug status pages
// worker.startStatusPages(); -- called separately for lighter-weight testing
// Worker is now running and processing work items.
// It will continue until shutdown() is called.
Startup Order Dependencies:
configFetcher.start()
|-- MUST complete before harness.start() (needs Windmill endpoints)
|
memoryMonitor.start()
|-- Independent, but should start before harness (needed for waitForResources)
|
streamingWorkerHarness.start()
|-- Depends on configFetcher having endpoints
|-- Opens Windmill streams (work starts flowing)
|
sampler.start()
|-- Independent, can start anytime
|
workerStatusReporter.start()
|-- Should start after harness (reports on active work)
|
activeWorkRefresher.start()
|-- MUST start after harness (heartbeats active work items)
|
registerConfigObserver
|-- MUST register after harness is set (observer may switch harness)
Related Pages
- Principle:Apache_Beam_Heartbeat_and_Refresh -- The principle describing heartbeat and refresh mechanisms.
- Implementation:Apache_Beam_StreamingDataflowWorker_FromOptions -- The factory that constructs the worker before start() is called.
- Implementation:Apache_Beam_FanOutStreamingEngineWorkerHarness -- The Windmill harness activated by start().
- Implementation:Apache_Beam_StreamingEngineComputationConfigFetcher -- The config fetcher activated by start().
- Environment:Apache_Beam_Dataflow_Streaming_Runtime -- GCP Dataflow streaming worker runtime with Windmill and memory monitoring.