Implementation:Apache Beam FanOutStreamingEngineWorkerHarness
| Field | Value |
|---|---|
| Implementation Name | FanOutStreamingEngineWorkerHarness |
| Overview | Concrete implementation for establishing fan-out Windmill gRPC connections for streaming execution, managing multiple backend connections with budget distribution. |
| Module | runners/google-cloud-dataflow-java/worker |
| Repository | apache/beam |
| Related Principle | Principle:Apache_Beam_Windmill_Connection_Setup |
| last_updated | 2026-02-09 04:00 GMT |
Overview
FanOutStreamingEngineWorkerHarness is the StreamingWorkerHarness implementation that manages fan-out connections to multiple Windmill backend destinations for Dataflow Streaming Engine with DirectPath. Given a total GetWorkBudget, it divides the budget across active backends, starts GetWorkStreams to each, and dynamically adjusts connections as backend endpoints change. It establishes and manages the full set of gRPC streaming connections (GetWork, GetData, CommitWork) required for streaming work distribution.
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java |
L83 | Class declaration: public final class FanOutStreamingEngineWorkerHarness implements StreamingWorkerHarness
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java |
L118-145 | Private constructor |
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java |
L152-176 | create() factory method
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java |
L211-218 | start() method
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java |
L242-268 | shutdown() method
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java |
L270-279 | consumeWorkerMetadata() -- endpoint update handler
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java |
L281-299 | consumeWindmillWorkerEndpoints() -- endpoint reconfiguration
|
Signature
Factory Method:
public static FanOutStreamingEngineWorkerHarness create(
JobHeader jobHeader,
GetWorkBudget totalGetWorkBudget,
GrpcWindmillStreamFactory streamingEngineStreamFactory,
WorkItemScheduler processWorkItem,
ChannelCachingStubFactory channelCachingStubFactory,
GetWorkBudgetDistributor getWorkBudgetDistributor,
GrpcDispatcherClient dispatcherClient,
Function<WindmillStream.CommitWorkStream, WorkCommitter> workCommitterFactory,
ThrottlingGetDataMetricTracker getDataMetricTracker)
Lifecycle Methods:
// Starts the harness by creating and starting a GetWorkerMetadataStream
// to receive endpoint assignments from the Windmill dispatcher.
@Override
public synchronized void start() {
Preconditions.checkState(!started,
"FanOutStreamingEngineWorkerHarness cannot start twice.");
getWorkerMetadataStream =
streamFactory.createGetWorkerMetadataStream(
dispatcherClient::getWindmillMetadataServiceStubBlocking,
this::consumeWorkerMetadata);
getWorkerMetadataStream.start();
started = true;
}
// Shuts down all streams and releases resources.
@Override
public synchronized void shutdown() {
Preconditions.checkState(started,
"FanOutStreamingEngineWorkerHarness never started.");
getWorkerMetadataStream.shutdown();
workerMetadataConsumer.shutdownNow();
closeStreamsNotIn(WindmillEndpoints.none()).join();
channelCachingStubFactory.shutdown();
// ... await termination of stream managers ...
}
Endpoint Consumption:
// Called when new Windmill endpoints are received from the metadata stream.
// Version checking ensures monotonic progress (no duplicate processing).
private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
synchronized (metadataLock) {
if (windmillEndpoints.version() > pendingMetadataVersion) {
pendingMetadataVersion = windmillEndpoints.version();
workerMetadataConsumer.execute(
() -> consumeWindmillWorkerEndpoints(windmillEndpoints));
}
}
}
Import
import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness;
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
| jobHeader | Windmill.JobHeader |
Identifies the job and worker (job ID, project ID, worker ID, client ID). |
| totalGetWorkBudget | GetWorkBudget |
Total budget (max items + max bytes) to distribute across all Windmill backends. |
| streamFactory | GrpcWindmillStreamFactory |
Factory for creating gRPC streams (GetWork, GetData, CommitWork, GetWorkerMetadata). |
| workItemScheduler | WorkItemScheduler |
Callback invoked when a work item is received from a GetWork stream. |
| channelCachingStubFactory | ChannelCachingStubFactory |
Creates and caches gRPC stubs for Windmill backend connections. |
| getWorkBudgetDistributor | GetWorkBudgetDistributor |
Strategy for distributing the total budget across active backends (default: even distribution). |
| dispatcherClient | GrpcDispatcherClient |
Client for the Windmill dispatcher service that provides endpoint metadata. |
| workCommitterFactory | Function<CommitWorkStream, WorkCommitter> |
Factory to create a WorkCommitter for each backend's commit stream.
|
| getDataMetricTracker | ThrottlingGetDataMetricTracker |
Tracks GetData metrics and applies throttling. |
Outputs
| Output | Type | Description |
|---|---|---|
| Active Harness | FanOutStreamingEngineWorkerHarness |
A running harness with active gRPC stream connections to all assigned Windmill backends. |
| Work Items | via WorkItemScheduler callback |
Work items received from GetWork streams are delivered to the scheduler. |
| Endpoint Set | ImmutableSet<HostAndPort> |
Queryable via currentWindmillEndpoints() for status reporting.
|
Internal Architecture
The harness maintains an AtomicReference<StreamingEngineBackends> that holds the current state of all backend connections:
StreamingEngineBackends
+-- windmillStreams: ImmutableMap<Endpoint, WindmillStreamSender>
| Each WindmillStreamSender manages:
| - GetWorkStream (receives work items)
| - GetDataStream (reads state)
| - CommitWorkStream (commits results)
| - WorkCommitter (manages commit lifecycle)
+-- globalDataStreams: ImmutableMap<String, GlobalDataStreamSender>
For global data (side inputs) access
When endpoints change, the harness:
- Closes streams to backends no longer in the new endpoint set.
- Creates new
WindmillStreamSenderinstances for newly added endpoints. - Merges new and existing streams into a new
StreamingEngineBackendssnapshot. - Redistributes the GetWork budget across all active backends.
Usage Examples
Creation within StreamingDataflowWorker:
FanOutStreamingEngineWorkerHarness fanOutHarness =
FanOutStreamingEngineWorkerHarness.create(
createJobHeader(options, clientId),
GetWorkBudget.builder()
.setItems(chooseMaxBundlesOutstanding(options))
.setBytes(MAX_GET_WORK_FETCH_BYTES) // 64 MB
.build(),
windmillStreamFactory,
workItemScheduler,
ChannelCachingRemoteStubFactory.create(
options.getGcpCredential(), channelCache),
GetWorkBudgetDistributors.distributeEvenly(),
dispatcherClient,
commitStream -> StreamingEngineWorkCommitter.builder()
.setCommitByteSemaphore(maxCommitByteSemaphore)
.setBackendWorkerToken(commitStream.backendWorkerToken())
.setOnCommitComplete(this::onCompleteCommit)
.setNumCommitSenders(numCommitThreads)
.setCommitWorkStreamFactory(
() -> CloseableStream.create(commitStream, () -> {}))
.build(),
getDataMetricTracker);
// Start activates the metadata stream
fanOutHarness.start();
Related Pages
- Principle:Apache_Beam_Windmill_Connection_Setup -- The principle describing Windmill connection setup.
- Implementation:Apache_Beam_StreamingDataflowWorker_FromOptions -- The factory that constructs this harness during worker initialization.
- Implementation:Apache_Beam_StreamingDataflowWorker_Start -- The start method that activates this harness.
- Environment:Apache_Beam_Dataflow_Streaming_Runtime -- GCP Dataflow streaming worker runtime with Windmill and memory monitoring.