Implementation:Apache Beam StreamingEngineComputationConfigFetcher
| Field | Value |
|---|---|
| Implementation Name | StreamingEngineComputationConfigFetcher |
| Overview | Concrete implementation for fetching and caching streaming computation configurations from the Dataflow service, with periodic refresh and DoFn dispatch. |
| Module | runners/google-cloud-dataflow-java/worker |
| Repository | apache/beam |
| Related Principle | Principle:Apache_Beam_Computation_Configuration |
| last_updated | 2026-02-09 04:00 GMT |
Overview
StreamingEngineComputationConfigFetcher is the Streaming Engine implementation of ComputationConfig.Fetcher. It asynchronously fetches global pipeline configuration from the Dataflow service, caches computation configurations locally, and provides on-demand fetching for individual computation IDs. It works in conjunction with DefaultParDoFnFactory, which dispatches to specialized factories based on the computation type specified in the fetched configuration.
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java |
L64 | Class declaration: public final class StreamingEngineComputationConfigFetcher implements ComputationConfig.Fetcher
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java |
L82-93 | Private constructor |
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java |
L95-104 | create() factory method
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java |
L239-242 | start() method
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java |
L245-252 | fetchConfig() method
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java |
L295-309 | fetchInitialPipelineGlobalConfig() -- blocking initial fetch
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java |
L129-157 | fetchConfigWithRetry() -- retry with exponential backoff
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DefaultParDoFnFactory.java |
L33-84 | DefaultParDoFnFactory -- DoFn dispatch table
|
Signature
Factory Method:
public static StreamingEngineComputationConfigFetcher create(
long globalConfigRefreshPeriodMillis,
WorkUnitClient dataflowServiceClient) {
return new StreamingEngineComputationConfigFetcher(
/* hasReceivedGlobalConfig= */ false,
globalConfigRefreshPeriodMillis,
dataflowServiceClient,
new StreamingGlobalConfigHandleImpl(),
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("GlobalPipelineConfigRefresher").build()));
}
Lifecycle Methods:
@Override
public void start() {
fetchInitialPipelineGlobalConfig(); // Blocks until initial config received
schedulePeriodicGlobalConfigRequests(); // Starts background refresh thread
}
@Override
public Optional<ComputationConfig> fetchConfig(String computationId) {
Preconditions.checkArgument(!computationId.isEmpty(),
"computationId is empty. Cannot fetch without a computationId.");
return fetchConfigWithRetry(
() -> dataflowServiceClient.getStreamingConfigWorkItem(computationId))
.flatMap(StreamingEngineComputationConfigFetcher::createComputationConfig);
}
@Override
public StreamingGlobalConfigHandle getGlobalConfigHandle() {
return globalConfigHandle;
}
@Override
public void stop() {
if (globalConfigRefresher.isShutdown() || !hasReceivedGlobalConfig.get()) {
return;
}
globalConfigRefresher.shutdown();
// ... await termination with 10-second timeout ...
}
DefaultParDoFnFactory Dispatch:
public class DefaultParDoFnFactory implements ParDoFnFactory {
private final ImmutableMap<String, ParDoFnFactory> defaultFactories;
public DefaultParDoFnFactory() {
defaultFactories = ImmutableMap.<String, ParDoFnFactory>builder()
.put("DoFn", UserParDoFnFactory.createDefault())
.put("CombineValuesFn", new CombineValuesFnFactory())
.put("MergeBucketsDoFn", new GroupAlsoByWindowParDoFnFactory())
.put("AssignBucketsDoFn", new AssignWindowsParDoFnFactory())
.put("MergeWindowsDoFn", new GroupAlsoByWindowParDoFnFactory())
.put("AssignWindowsDoFn", new AssignWindowsParDoFnFactory())
.put("SplittableProcessFn", SplittableProcessFnFactory.createDefault())
// ... additional factories ...
.build();
}
@Override
public ParDoFn create(PipelineOptions options, CloudObject cloudUserFn,
List<SideInputInfo> sideInputInfos, TupleTag<?> mainOutputTag,
Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
DataflowExecutionContext<?> executionContext,
DataflowOperationContext operationContext) throws Exception {
String className = cloudUserFn.getClassName();
ParDoFnFactory factory = defaultFactories.get(className);
if (factory == null) {
throw new Exception("No known ParDoFnFactory for " + className);
}
return factory.create(options, cloudUserFn, sideInputInfos,
mainOutputTag, outputTupleTagsToReceiverIndices,
executionContext, operationContext);
}
}
Import
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingEngineComputationConfigFetcher;
import org.apache.beam.runners.dataflow.worker.streaming.config.ComputationConfig;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
| globalConfigRefreshPeriodMillis | long |
Interval in milliseconds between periodic global config fetches. |
| dataflowServiceClient | WorkUnitClient |
Client for communicating with the Dataflow service API. |
Outputs
| Output | Type | Description |
|---|---|---|
| ComputationConfig | Optional<ComputationConfig> |
On-demand computation configuration containing MapTask, transform-to-state-family mappings. |
| StreamingGlobalConfig | via StreamingGlobalConfigHandle |
Global pipeline configuration including Windmill endpoints, operational limits, tag encoding version, and user worker settings. Delivered to registered observers. |
Configuration Structure
The global configuration (StreamingGlobalConfig) includes:
| Field | Description |
|---|---|
| windmillServiceEndpoints | Set of HostAndPort for Windmill backends
|
| operationalLimits | Max commit bytes, max key bytes, max output bytes |
| enableStateTagEncodingV2 | Whether to use V2 tag encoding for state |
| userWorkerJobSettings | Protobuf settings including connectivity type (DirectPath/CloudPath) |
Retry Configuration
| Parameter | Value |
|---|---|
| Initial Backoff | 100 ms |
| Max Backoff | 1 minute |
| Max Cumulative Backoff | 5 minutes |
| Initial Config Retry | Retries every 5 seconds until received |
Usage Examples
Fetching a Specific Computation Config:
// When a work item arrives for an unknown computation ID
Optional<ComputationConfig> config = configFetcher.fetchConfig("computation-123");
config.ifPresent(cfg -> {
MapTask mapTask = cfg.getMapTask();
Map<String, String> transformToStateFamily = cfg.getTransformUserNameToStateFamily();
// Register the computation in the ComputationStateCache
});
Registering a Config Observer:
// Register an observer for connectivity type changes
configFetcher.getGlobalConfigHandle().registerConfigObserver(
streamingGlobalConfig -> {
ConnectivityType connectivityType =
streamingGlobalConfig.userWorkerJobSettings().getConnectivityType();
if (connectivityType != ConnectivityType.CONNECTIVITY_TYPE_DEFAULT) {
switchStreamingWorkerHarness(connectivityType);
}
});
Related Pages
- Principle:Apache_Beam_Computation_Configuration -- The principle describing computation configuration.
- Implementation:Apache_Beam_StreamingDataflowWorker_FromOptions -- Creates the config fetcher during worker initialization.
- Implementation:Apache_Beam_WindmillTimerInternals -- Uses computation configuration to determine timer state families.
- Environment:Apache_Beam_Dataflow_Streaming_Runtime -- GCP Dataflow streaming worker runtime with Windmill and memory monitoring.