Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam StreamingDataflowWorker FromOptions

From Leeroopedia


Field Value
Implementation Name StreamingDataflowWorker FromOptions
Overview Concrete factory method for bootstrapping a streaming Dataflow worker from pipeline options provided by the Dataflow service.
Module runners/google-cloud-dataflow-java/worker
Repository apache/beam
Related Principle Principle:Apache_Beam_Worker_Initialization
last_updated 2026-02-09 04:00 GMT

Overview

The StreamingDataflowWorker.fromOptions() factory method is the concrete entry point for bootstrapping a fully initialized streaming Dataflow worker. It reads pipeline options from JVM system properties set by the Dataflow service, constructs all required subsystems (state cache, computation cache, work executor, memory monitor, Windmill harness, metrics reporters, and failure trackers), and returns a fully wired StreamingDataflowWorker instance ready to be started.

Code Reference

Source Location

File Lines Description
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java L612-697 fromOptions() factory method
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java L136 Class declaration: public final class StreamingDataflowWorker
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java L200-330 Private constructor with subsystem wiring
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WorkerPipelineOptionsFactory.java L49-94 createFromSystemProperties() -- pipeline options extraction
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowWorkerHarnessHelper.java L52-81 initializeGlobalStateAndPipelineOptions() -- logging and global state

Signature

Primary Factory Method:

public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) {
    long clientId = CLIENT_ID_GENERATOR.nextLong();
    MemoryMonitor memoryMonitor = MemoryMonitor.fromOptions(options);
    ConcurrentMap<String, StageInfo> stageInfo = new ConcurrentHashMap<>();
    StreamingCounters streamingCounters = StreamingCounters.create();
    WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG);
    BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options);
    WindmillStateCache windmillStateCache =
        WindmillStateCache.builder()
            .setSizeMb(options.getWorkerCacheMb())
            .setSupportMapViaMultimap(options.isEnableStreamingEngine())
            .build();
    // ... constructs config fetcher, computation state cache, windmill client,
    //     failure tracker, work failure processor, status reporter ...
    return new StreamingDataflowWorker(
        windmillServer, clientId, configFetcher, computationStateCache,
        windmillStateCache, workExecutor, IntrinsicMapTaskExecutorFactory.defaultFactory(),
        options, new HotKeyLogger(), clock, workerStatusReporter,
        failureTracker, workFailureProcessor, streamingCounters, memoryMonitor,
        windmillStreamFactory, activeWorkRefreshExecutor, stageInfo, dispatcherClient);
}

Supporting: WorkerPipelineOptionsFactory.createFromSystemProperties():

public static <T extends DataflowWorkerHarnessOptions> T createFromSystemProperties(
    Class<T> harnessOptionsClass) throws IOException {
    ObjectMapper objectMapper = new ObjectMapper();
    T options;
    if (System.getProperties().containsKey("sdk_pipeline_options")) {
        String serializedOptions = System.getProperty("sdk_pipeline_options");
        options = objectMapper.readValue(serializedOptions, PipelineOptions.class)
            .as(harnessOptionsClass);
    } else if (System.getProperties().containsKey("sdk_pipeline_options_file")) {
        String filePath = System.getProperty("sdk_pipeline_options_file");
        String serializedOptions = new String(
            Files.readAllBytes(Paths.get(filePath)), StandardCharsets.UTF_8);
        options = objectMapper.readValue(serializedOptions, PipelineOptions.class)
            .as(harnessOptionsClass);
    } else {
        options = PipelineOptionsFactory.as(harnessOptionsClass);
    }
    // Inject runtime properties: worker_id, job_id, worker_pool
    if (System.getProperties().containsKey("worker_id")) {
        options.setWorkerId(System.getProperty("worker_id"));
    }
    // ...
    return options;
}

Supporting: DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions():

public static <T extends DataflowWorkerHarnessOptions> T initializeGlobalStateAndPipelineOptions(
    Class<?> workerHarnessClass, Class<T> harnessOptionsClass) throws Exception {
    T pipelineOptions =
        WorkerPipelineOptionsFactory.createFromSystemProperties(harnessOptionsClass);
    pipelineOptions.setAppName(workerHarnessClass.getSimpleName());
    DataflowWorkerLoggingMDC.setJobId(pipelineOptions.getJobId());
    DataflowWorkerLoggingMDC.setWorkerId(pipelineOptions.getWorkerId());
    // Enable Conscrypt SSL if experiment is enabled
    ExperimentContext ec = ExperimentContext.parseFrom(pipelineOptions);
    if (ec.isEnabled(Experiment.EnableConscryptSecurityProvider)) {
        Security.insertProviderAt(new OpenSSLProvider(), 1);
    }
    return pipelineOptions;
}

Import

import org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.WorkerPipelineOptionsFactory;
import org.apache.beam.runners.dataflow.worker.DataflowWorkerHarnessHelper;

I/O Contract

Inputs

Input Type Description
options DataflowWorkerHarnessOptions Pipeline options extracted from JVM system properties. Contains all configuration for the worker: job ID, worker ID, cache sizes, commit thread counts, Windmill endpoints, experiments, and more.
System Properties JVM system properties sdk_pipeline_options or sdk_pipeline_options_file for pipeline options; worker_id, job_id, worker_pool for runtime identity.

Outputs

Output Type Description
StreamingDataflowWorker StreamingDataflowWorker A fully initialized worker instance with all subsystems wired together: state cache, computation cache, work executor, memory monitor, Windmill harness, metrics reporter, failure tracker, active work refresher, and status pages. The worker is in a constructed but not started state.

Subsystems Constructed

Subsystem Class Purpose
State Cache WindmillStateCache In-memory cache for Windmill-backed per-key state
Computation Cache ComputationStateCache Maps computation IDs to their execution contexts
Work Executor BoundedQueueExecutor Thread pool (up to 300 threads) for concurrent work item processing
Memory Monitor MemoryMonitor Monitors JVM memory pressure and throttles work acquisition
Config Fetcher StreamingEngineComputationConfigFetcher Fetches computation configurations from the Dataflow service
Failure Tracker FailureTracker Tracks and reports processing failures
Status Reporter StreamingWorkerStatusReporter Periodically reports metrics to the Dataflow service
Work Scheduler StreamingWorkScheduler Orchestrates scheduling and execution of work items
Windmill Stream Factory GrpcWindmillStreamFactory Creates gRPC streams for Windmill communication
Worker Harness StreamingWorkerHarness Manages Windmill connections (FanOut or SingleSource)

Usage Examples

Typical Worker Bootstrap (as performed by DataflowStreamingWorkerHarness):

// Step 1: Initialize global state and extract pipeline options
DataflowWorkerHarnessOptions options =
    DataflowWorkerHarnessHelper.initializeGlobalStateAndPipelineOptions(
        DataflowStreamingWorkerHarness.class,
        DataflowWorkerHarnessOptions.class);

// Step 2: Construct the fully wired worker
StreamingDataflowWorker worker = StreamingDataflowWorker.fromOptions(options);

// Step 3: Start all subsystems
worker.start();

Key Pipeline Options Influencing Initialization:

// State cache size (default varies by machine type)
options.getWorkerCacheMb();

// Commit thread parallelism for Streaming Engine
options.getWindmillServiceCommitThreads();

// Whether to use DirectPath for Windmill communication
options.getIsWindmillServiceDirectPathEnabled();

// Whether Streaming Engine is enabled (vs. Appliance)
options.isEnableStreamingEngine();

// Active work refresh period in milliseconds
options.getActiveWorkRefreshPeriodMillis();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment