Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam StreamingEngineComputationConfigFetcher

From Leeroopedia


Field Value
Implementation Name StreamingEngineComputationConfigFetcher
Overview Concrete implementation for fetching and caching streaming computation configurations from the Dataflow service, with periodic refresh and DoFn dispatch.
Module runners/google-cloud-dataflow-java/worker
Repository apache/beam
Related Principle Principle:Apache_Beam_Computation_Configuration
last_updated 2026-02-09 04:00 GMT

Overview

StreamingEngineComputationConfigFetcher is the Streaming Engine implementation of ComputationConfig.Fetcher. It asynchronously fetches global pipeline configuration from the Dataflow service, caches computation configurations locally, and provides on-demand fetching for individual computation IDs. It works in conjunction with DefaultParDoFnFactory, which dispatches to specialized factories based on the computation type specified in the fetched configuration.

Code Reference

Source Location

File Lines Description
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java L64 Class declaration: public final class StreamingEngineComputationConfigFetcher implements ComputationConfig.Fetcher
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java L82-93 Private constructor
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java L95-104 create() factory method
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java L239-242 start() method
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java L245-252 fetchConfig() method
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java L295-309 fetchInitialPipelineGlobalConfig() -- blocking initial fetch
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java L129-157 fetchConfigWithRetry() -- retry with exponential backoff
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DefaultParDoFnFactory.java L33-84 DefaultParDoFnFactory -- DoFn dispatch table

Signature

Factory Method:

public static StreamingEngineComputationConfigFetcher create(
    long globalConfigRefreshPeriodMillis,
    WorkUnitClient dataflowServiceClient) {
    return new StreamingEngineComputationConfigFetcher(
        /* hasReceivedGlobalConfig= */ false,
        globalConfigRefreshPeriodMillis,
        dataflowServiceClient,
        new StreamingGlobalConfigHandleImpl(),
        Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder()
                .setNameFormat("GlobalPipelineConfigRefresher").build()));
}

Lifecycle Methods:

@Override
public void start() {
    fetchInitialPipelineGlobalConfig();   // Blocks until initial config received
    schedulePeriodicGlobalConfigRequests(); // Starts background refresh thread
}

@Override
public Optional<ComputationConfig> fetchConfig(String computationId) {
    Preconditions.checkArgument(!computationId.isEmpty(),
        "computationId is empty. Cannot fetch without a computationId.");
    return fetchConfigWithRetry(
            () -> dataflowServiceClient.getStreamingConfigWorkItem(computationId))
        .flatMap(StreamingEngineComputationConfigFetcher::createComputationConfig);
}

@Override
public StreamingGlobalConfigHandle getGlobalConfigHandle() {
    return globalConfigHandle;
}

@Override
public void stop() {
    if (globalConfigRefresher.isShutdown() || !hasReceivedGlobalConfig.get()) {
        return;
    }
    globalConfigRefresher.shutdown();
    // ... await termination with 10-second timeout ...
}

DefaultParDoFnFactory Dispatch:

public class DefaultParDoFnFactory implements ParDoFnFactory {
    private final ImmutableMap<String, ParDoFnFactory> defaultFactories;

    public DefaultParDoFnFactory() {
        defaultFactories = ImmutableMap.<String, ParDoFnFactory>builder()
            .put("DoFn", UserParDoFnFactory.createDefault())
            .put("CombineValuesFn", new CombineValuesFnFactory())
            .put("MergeBucketsDoFn", new GroupAlsoByWindowParDoFnFactory())
            .put("AssignBucketsDoFn", new AssignWindowsParDoFnFactory())
            .put("MergeWindowsDoFn", new GroupAlsoByWindowParDoFnFactory())
            .put("AssignWindowsDoFn", new AssignWindowsParDoFnFactory())
            .put("SplittableProcessFn", SplittableProcessFnFactory.createDefault())
            // ... additional factories ...
            .build();
    }

    @Override
    public ParDoFn create(PipelineOptions options, CloudObject cloudUserFn,
        List<SideInputInfo> sideInputInfos, TupleTag<?> mainOutputTag,
        Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
        DataflowExecutionContext<?> executionContext,
        DataflowOperationContext operationContext) throws Exception {
        String className = cloudUserFn.getClassName();
        ParDoFnFactory factory = defaultFactories.get(className);
        if (factory == null) {
            throw new Exception("No known ParDoFnFactory for " + className);
        }
        return factory.create(options, cloudUserFn, sideInputInfos,
            mainOutputTag, outputTupleTagsToReceiverIndices,
            executionContext, operationContext);
    }
}

Import

import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory;

I/O Contract

Inputs

Input Type Description
globalConfigRefreshPeriodMillis long Interval in milliseconds between periodic global config fetches.
dataflowServiceClient WorkUnitClient Client for communicating with the Dataflow service API.

Outputs

Output Type Description
ComputationConfig Optional<ComputationConfig> On-demand computation configuration containing MapTask, transform-to-state-family mappings.
StreamingGlobalConfig via StreamingGlobalConfigHandle Global pipeline configuration including Windmill endpoints, operational limits, tag encoding version, and user worker settings. Delivered to registered observers.

Configuration Structure

The global configuration (StreamingGlobalConfig) includes:

Field Description
windmillServiceEndpoints Set of HostAndPort for Windmill backends
operationalLimits Max commit bytes, max key bytes, max output bytes
enableStateTagEncodingV2 Whether to use V2 tag encoding for state
userWorkerJobSettings Protobuf settings including connectivity type (DirectPath/CloudPath)

Retry Configuration

Parameter Value
Initial Backoff 100 ms
Max Backoff 1 minute
Max Cumulative Backoff 5 minutes
Initial Config Retry Retries every 5 seconds until received

Usage Examples

Fetching a Specific Computation Config:

// When a work item arrives for an unknown computation ID
Optional<ComputationConfig> config = configFetcher.fetchConfig("computation-123");
config.ifPresent(cfg -> {
    MapTask mapTask = cfg.getMapTask();
    Map<String, String> transformToStateFamily = cfg.getTransformUserNameToStateFamily();
    // Register the computation in the ComputationStateCache
});

Registering a Config Observer:

// Register an observer for connectivity type changes
configFetcher.getGlobalConfigHandle().registerConfigObserver(
    streamingGlobalConfig -> {
        ConnectivityType connectivityType =
            streamingGlobalConfig.userWorkerJobSettings().getConnectivityType();
        if (connectivityType != ConnectivityType.CONNECTIVITY_TYPE_DEFAULT) {
            switchStreamingWorkerHarness(connectivityType);
        }
    });

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment