Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam FanOutStreamingEngineWorkerHarness

From Leeroopedia


Field Value
Implementation Name FanOutStreamingEngineWorkerHarness
Overview Concrete implementation for establishing fan-out Windmill gRPC connections for streaming execution, managing multiple backend connections with budget distribution.
Module runners/google-cloud-dataflow-java/worker
Repository apache/beam
Related Principle Principle:Apache_Beam_Windmill_Connection_Setup
last_updated 2026-02-09 04:00 GMT

Overview

FanOutStreamingEngineWorkerHarness is the StreamingWorkerHarness implementation that manages fan-out connections to multiple Windmill backend destinations for Dataflow Streaming Engine with DirectPath. Given a total GetWorkBudget, it divides the budget across active backends, starts GetWorkStreams to each, and dynamically adjusts connections as backend endpoints change. It establishes and manages the full set of gRPC streaming connections (GetWork, GetData, CommitWork) required for streaming work distribution.

Code Reference

Source Location

File Lines Description
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java L83 Class declaration: public final class FanOutStreamingEngineWorkerHarness implements StreamingWorkerHarness
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java L118-145 Private constructor
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java L152-176 create() factory method
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java L211-218 start() method
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java L242-268 shutdown() method
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java L270-279 consumeWorkerMetadata() -- endpoint update handler
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java L281-299 consumeWindmillWorkerEndpoints() -- endpoint reconfiguration

Signature

Factory Method:

public static FanOutStreamingEngineWorkerHarness create(
    JobHeader jobHeader,
    GetWorkBudget totalGetWorkBudget,
    GrpcWindmillStreamFactory streamingEngineStreamFactory,
    WorkItemScheduler processWorkItem,
    ChannelCachingStubFactory channelCachingStubFactory,
    GetWorkBudgetDistributor getWorkBudgetDistributor,
    GrpcDispatcherClient dispatcherClient,
    Function<WindmillStream.CommitWorkStream, WorkCommitter> workCommitterFactory,
    ThrottlingGetDataMetricTracker getDataMetricTracker)

Lifecycle Methods:

// Starts the harness by creating and starting a GetWorkerMetadataStream
// to receive endpoint assignments from the Windmill dispatcher.
@Override
public synchronized void start() {
    Preconditions.checkState(!started,
        "FanOutStreamingEngineWorkerHarness cannot start twice.");
    getWorkerMetadataStream =
        streamFactory.createGetWorkerMetadataStream(
            dispatcherClient::getWindmillMetadataServiceStubBlocking,
            this::consumeWorkerMetadata);
    getWorkerMetadataStream.start();
    started = true;
}

// Shuts down all streams and releases resources.
@Override
public synchronized void shutdown() {
    Preconditions.checkState(started,
        "FanOutStreamingEngineWorkerHarness never started.");
    getWorkerMetadataStream.shutdown();
    workerMetadataConsumer.shutdownNow();
    closeStreamsNotIn(WindmillEndpoints.none()).join();
    channelCachingStubFactory.shutdown();
    // ... await termination of stream managers ...
}

Endpoint Consumption:

// Called when new Windmill endpoints are received from the metadata stream.
// Version checking ensures monotonic progress (no duplicate processing).
private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
    synchronized (metadataLock) {
        if (windmillEndpoints.version() > pendingMetadataVersion) {
            pendingMetadataVersion = windmillEndpoints.version();
            workerMetadataConsumer.execute(
                () -> consumeWindmillWorkerEndpoints(windmillEndpoints));
        }
    }
}

Import

import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness;
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget;
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor;
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader;

I/O Contract

Inputs

Input Type Description
jobHeader Windmill.JobHeader Identifies the job and worker (job ID, project ID, worker ID, client ID).
totalGetWorkBudget GetWorkBudget Total budget (max items + max bytes) to distribute across all Windmill backends.
streamFactory GrpcWindmillStreamFactory Factory for creating gRPC streams (GetWork, GetData, CommitWork, GetWorkerMetadata).
workItemScheduler WorkItemScheduler Callback invoked when a work item is received from a GetWork stream.
channelCachingStubFactory ChannelCachingStubFactory Creates and caches gRPC stubs for Windmill backend connections.
getWorkBudgetDistributor GetWorkBudgetDistributor Strategy for distributing the total budget across active backends (default: even distribution).
dispatcherClient GrpcDispatcherClient Client for the Windmill dispatcher service that provides endpoint metadata.
workCommitterFactory Function<CommitWorkStream, WorkCommitter> Factory to create a WorkCommitter for each backend's commit stream.
getDataMetricTracker ThrottlingGetDataMetricTracker Tracks GetData metrics and applies throttling.

Outputs

Output Type Description
Active Harness FanOutStreamingEngineWorkerHarness A running harness with active gRPC stream connections to all assigned Windmill backends.
Work Items via WorkItemScheduler callback Work items received from GetWork streams are delivered to the scheduler.
Endpoint Set ImmutableSet<HostAndPort> Queryable via currentWindmillEndpoints() for status reporting.

Internal Architecture

The harness maintains an AtomicReference<StreamingEngineBackends> that holds the current state of all backend connections:

StreamingEngineBackends
  +-- windmillStreams: ImmutableMap<Endpoint, WindmillStreamSender>
  |     Each WindmillStreamSender manages:
  |       - GetWorkStream (receives work items)
  |       - GetDataStream (reads state)
  |       - CommitWorkStream (commits results)
  |       - WorkCommitter (manages commit lifecycle)
  +-- globalDataStreams: ImmutableMap<String, GlobalDataStreamSender>
        For global data (side inputs) access

When endpoints change, the harness:

  1. Closes streams to backends no longer in the new endpoint set.
  2. Creates new WindmillStreamSender instances for newly added endpoints.
  3. Merges new and existing streams into a new StreamingEngineBackends snapshot.
  4. Redistributes the GetWork budget across all active backends.

Usage Examples

Creation within StreamingDataflowWorker:

FanOutStreamingEngineWorkerHarness fanOutHarness =
    FanOutStreamingEngineWorkerHarness.create(
        createJobHeader(options, clientId),
        GetWorkBudget.builder()
            .setItems(chooseMaxBundlesOutstanding(options))
            .setBytes(MAX_GET_WORK_FETCH_BYTES)  // 64 MB
            .build(),
        windmillStreamFactory,
        workItemScheduler,
        ChannelCachingRemoteStubFactory.create(
            options.getGcpCredential(), channelCache),
        GetWorkBudgetDistributors.distributeEvenly(),
        dispatcherClient,
        commitStream -> StreamingEngineWorkCommitter.builder()
            .setCommitByteSemaphore(maxCommitByteSemaphore)
            .setBackendWorkerToken(commitStream.backendWorkerToken())
            .setOnCommitComplete(this::onCompleteCommit)
            .setNumCommitSenders(numCommitThreads)
            .setCommitWorkStreamFactory(
                () -> CloseableStream.create(commitStream, () -> {}))
            .build(),
        getDataMetricTracker);

// Start activates the metadata stream
fanOutHarness.start();

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment