Implementation:Apache Beam StreamingDataflowWorker FromOptions
| Field | Value |
|---|---|
| Implementation Name | StreamingDataflowWorker FromOptions |
| Overview | Concrete factory method for bootstrapping a streaming Dataflow worker from pipeline options provided by the Dataflow service. |
| Module | runners/google-cloud-dataflow-java/worker |
| Repository | apache/beam |
| Related Principle | Principle:Apache_Beam_Worker_Initialization |
| last_updated | 2026-02-09 04:00 GMT |
Overview
The StreamingDataflowWorker.fromOptions() factory method is the concrete entry point for bootstrapping a fully initialized streaming Dataflow worker. It reads pipeline options from JVM system properties set by the Dataflow service, constructs all required subsystems (state cache, computation cache, work executor, memory monitor, Windmill harness, metrics reporters, and failure trackers), and returns a fully wired StreamingDataflowWorker instance ready to be started.
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java |
L612-697 | fromOptions() factory method
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java |
L136 | Class declaration: public final class StreamingDataflowWorker
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java |
L200-330 | Private constructor with subsystem wiring |
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java |
L49-94 | createFromSystemProperties() -- pipeline options extraction
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java |
L52-81 | initializeGlobalStateAndPipelineOptions() -- logging and global state
|
Signature
Primary Factory Method:
public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) {
long clientId = CLIENT_ID_GENERATOR.nextLong();
MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
StreamingCounters streamingCounters = StreamingCounters.create();
WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG);
BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
WindmillStateCache windmillStateCache =
WindmillStateCache.builder()
.setSizeMb(options.getWorkerCacheMb())
.setSupportMapViaMultimap(options.isEnableStreamingEngine())
.build();
// ... constructs config fetcher, computation state cache, windmill client,
// failure tracker, work failure processor, status reporter ...
return new StreamingDataflowWorker(
windmillServer, clientId, configFetcher, computationStateCache,
windmillStateCache, workExecutor, IntrinsicMapTaskExecutorFactory.defaultFactory(),
options, new HotKeyLogger(), clock, workerStatusReporter,
failureTracker, workFailureProcessor, streamingCounters, memoryMonitor,
windmillStreamFactory, activeWorkRefreshExecutor, stageInfo, dispatcherClient);
}
Supporting: WorkerPipelineOptionsFactory.createFromSystemProperties():
public static <T extends DataflowWorkerHarnessOptions> T createFromSystemProperties(
Class<T> harnessOptionsClass) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
T options;
if (System.getProperties().containsKey("sdk_pipeline_options")) {
String serializedOptions = System.getProperty("sdk_pipeline_options");
options = objectMapper.readValue(serializedOptions, PipelineOptions.class)
.as(harnessOptionsClass);
} else if (System.getProperties().containsKey("sdk_pipeline_options_file")) {
String filePath = System.getProperty("sdk_pipeline_options_file");
String serializedOptions = new String(
Files.readAllBytes(Paths.get(filePath)), StandardCharsets.UTF_8);
options = objectMapper.readValue(serializedOptions, PipelineOptions.class)
.as(harnessOptionsClass);
} else {
options = PipelineOptionsFactory.as(harnessOptionsClass);
}
// Inject runtime properties: worker_id, job_id, worker_pool
if (System.getProperties().containsKey("worker_id")) {
options.setWorkerId(System.getProperty("worker_id"));
}
// ...
return options;
}
Supporting: DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions():
public static <T extends DataflowWorkerHarnessOptions> T initializeGlobalStateAndPipelineOptions(
Class<?> workerHarnessClass, Class<T> harnessOptionsClass) throws Exception {
T pipelineOptions =
WorkerPipelineOptionsFactory.createFromSystemProperties(harnessOptionsClass);
pipelineOptions.setAppName(workerHarnessClass.getSimpleName());
DataflowWorkerLoggingMDC.setJobId(pipelineOptions.getJobId());
DataflowWorkerLoggingMDC.setWorkerId(pipelineOptions.getWorkerId());
// Enable Conscrypt SSL if experiment is enabled
ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions);
if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) {
Security.insertProviderAt(new OpenSSLProvider(), 1);
}
return pipelineOptions;
}
Import
import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.WorkerPipelineOptionsFactory;
import org.apache.beam.runners.dataflow.worker.DataflowWorkerHarnessHelper;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
| options | DataflowWorkerHarnessOptions |
Pipeline options extracted from JVM system properties. Contains all configuration for the worker: job ID, worker ID, cache sizes, commit thread counts, Windmill endpoints, experiments, and more. |
| System Properties | JVM system properties | sdk_pipeline_options or sdk_pipeline_options_file for pipeline options; worker_id, job_id, worker_pool for runtime identity.
|
Outputs
| Output | Type | Description |
|---|---|---|
| StreamingDataflowWorker | StreamingDataflowWorker |
A fully initialized worker instance with all subsystems wired together: state cache, computation cache, work executor, memory monitor, Windmill harness, metrics reporter, failure tracker, active work refresher, and status pages. The worker is in a constructed but not started state. |
Subsystems Constructed
| Subsystem | Class | Purpose |
|---|---|---|
| State Cache | WindmillStateCache |
In-memory cache for Windmill-backed per-key state |
| Computation Cache | ComputationStateCache |
Maps computation IDs to their execution contexts |
| Work Executor | BoundedQueueExecutor |
Thread pool (up to 300 threads) for concurrent work item processing |
| Memory Monitor | MemoryMonitor |
Monitors JVM memory pressure and throttles work acquisition |
| Config Fetcher | StreamingEngineComputationConfigFetcher |
Fetches computation configurations from the Dataflow service |
| Failure Tracker | FailureTracker |
Tracks and reports processing failures |
| Status Reporter | StreamingWorkerStatusReporter |
Periodically reports metrics to the Dataflow service |
| Work Scheduler | StreamingWorkScheduler |
Orchestrates scheduling and execution of work items |
| Windmill Stream Factory | GrpcWindmillStreamFactory |
Creates gRPC streams for Windmill communication |
| Worker Harness | StreamingWorkerHarness |
Manages Windmill connections (FanOut or SingleSource) |
Usage Examples
Typical Worker Bootstrap (as performed by DataflowStreamingWorkerHarness):
// Step 1: Initialize global state and extract pipeline options
DataflowWorkerHarnessOptions options =
DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
DataflowStreamingWorkerHarness.class,
DataflowWorkerHarnessOptions.class);
// Step 2: Construct the fully wired worker
StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options);
// Step 3: Start all subsystems
worker.start();
Key Pipeline Options Influencing Initialization:
// State cache size (default varies by machine type)
options.getWorkerCacheMb();
// Commit thread parallelism for Streaming Engine
options.getWindmillServiceCommitThreads();
// Whether to use DirectPath for Windmill communication
options.getIsWindmillServiceDirectPathEnabled();
// Whether Streaming Engine is enabled (vs. Appliance)
options.isEnableStreamingEngine();
// Active work refresh period in milliseconds
options.getActiveWorkRefreshPeriodMillis();
Related Pages
- Principle:Apache_Beam_Worker_Initialization -- The principle describing the worker initialization process.
- Implementation:Apache_Beam_FanOutStreamingEngineWorkerHarness -- The Windmill harness constructed during initialization for DirectPath mode.
- Implementation:Apache_Beam_StreamingDataflowWorker_Start -- The start method that activates subsystems after initialization.
- Environment:Apache_Beam_Dataflow_Streaming_Runtime -- GCP Dataflow streaming worker runtime with Windmill and memory monitoring.
- Heuristic:Apache_Beam_Lock_Contention_Batching -- Batched lock-free queue drain to reduce monitor contention in BoundedQueueExecutor.
- Heuristic:Apache_Beam_GC_Thrashing_Detection -- Memory monitoring with GC thrashing detection and JVM auto-kill.