Implementation:Apache Beam ImmutabilityEnforcementFactory
Appearance
| Field | Value |
|---|---|
| Implementation Name | ImmutabilityEnforcementFactory |
| Overview | Concrete tool for enforcing element immutability and encodability invariants during Direct Runner execution. |
| Source | runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
|
| Implements | Principle:Apache_Beam_Model_Enforcement |
| last_updated | 2026-02-09 04:00 GMT |
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java |
L42-158 | Full class: ModelEnforcementFactory for input immutability checking |
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java |
L43-45 | create(): factory method returning singleton instance
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java |
L48-54 | forBundle(): creates enforcement for a specific bundle and consumer
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java |
L56-60 | isReadTransform(): determines if transform is inside a Read composite
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java |
L107-157 | ImmutabilityCheckingEnforcement: inner class with beforeElement/afterElement/afterFinish
|
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java |
L49-end | BundleFactory wrapper enforcing output immutability at commit-time |
runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java |
L37-end | BundleFactory that clones elements through coder round-trips for encodability enforcement |
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java |
L73-136 | Enforcement enum: configures which enforcements are enabled
|
Signature
// ImmutabilityEnforcementFactory: enforces input element immutability
class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {
// Factory method
public static ModelEnforcementFactory create()
// Create enforcement for a specific bundle and consumer transform
@Override
public <T> ModelEnforcement<T> forBundle(
CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer)
// Check if a transform is inside a Read composite (exempt from enforcement)
static boolean isReadTransform(AppliedPTransform<?, ?, ?> consumer)
}
// ImmutabilityCheckingEnforcement: inner class performing actual checks
private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> {
// Snapshot element before processing
@Override
public void beforeElement(WindowedValue<T> element)
// Verify element unchanged after processing
@Override
public void afterElement(WindowedValue<T> element)
// Final verification after entire bundle is processed
@Override
public void afterFinish(
CommittedBundle<T> input,
TransformResult<T> result,
Iterable<? extends CommittedBundle<?>> outputs)
}
// ImmutabilityCheckingBundleFactory: enforces output element immutability
class ImmutabilityCheckingBundleFactory implements BundleFactory {
public static ImmutabilityCheckingBundleFactory create(
BundleFactory underlying, DirectGraph graph)
}
// CloningBundleFactory: enforces coder encodability via round-trip cloning
class CloningBundleFactory implements BundleFactory {
public static CloningBundleFactory create()
}
Import
import org.apache.beam.runners.direct.ImmutabilityEnforcementFactory;
import org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory;
import org.apache.beam.runners.direct.CloningBundleFactory;
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
graph |
DirectGraph |
The execution graph, used by the Enforcement enum to determine which PCollections are subject to immutability enforcement based on their producer transform.
|
options |
DirectOptions |
Configuration flags: isEnforceEncodability() (controls CloningBundleFactory) and isEnforceImmutability() (controls ImmutabilityEnforcementFactory and ImmutabilityCheckingBundleFactory).
|
input |
CommittedBundle<T> |
The bundle of elements to be processed, from which the Coder is extracted for snapshot creation.
|
consumer |
AppliedPTransform<?, ?, ?> |
The transform that will process the bundle, checked against the Read composite exemption. |
Outputs
| Output | Type | Description |
|---|---|---|
| Configured enforcement factories | Map<String, Collection<ModelEnforcementFactory>> |
A mapping from transform URNs (e.g., PAR_DO_TRANSFORM_URN) to enforcement factories. Returned by Enforcement.defaultModelEnforcements().
|
| Configured bundle factory | BundleFactory |
A bundle factory chain: base ImmutableListBundleFactory, optionally wrapped by CloningBundleFactory (encodability), optionally wrapped by ImmutabilityCheckingBundleFactory (output immutability). Returned by Enforcement.bundleFactoryFor().
|
| On violation | IllegalMutationException |
Thrown when a mutation is detected, with a message identifying the transform, the saved value, and the mutated value. |
Usage Examples
Enforcement Configuration in DirectRunner
// In DirectRunner constructor (L151-153)
private DirectRunner(DirectOptions options) {
this.options = options;
this.enabledEnforcements = Enforcement.enabled(options);
}
// In DirectRunner.run() (L196-213)
DirectGraph graph = graphVisitor.getGraph();
// Create bundle factory with enforcement wrappers
BundleFactory bundleFactory = Enforcement.bundleFactoryFor(enabledEnforcements, graph);
// If encodability enabled: CloningBundleFactory wraps ImmutableListBundleFactory
// If immutability enabled: ImmutabilityCheckingBundleFactory wraps the above
EvaluationContext context = EvaluationContext.create(
clockSupplier.get(), bundleFactory, graph,
keyedPValueVisitor.getKeyedPValues(), metricsPool);
// Create model enforcement factories for ParDo transforms
Map<String, Collection<ModelEnforcementFactory>> enforcements =
Enforcement.defaultModelEnforcements(enabledEnforcements);
// If immutability enabled: maps PAR_DO_TRANSFORM_URN -> [ImmutabilityEnforcementFactory]
How ImmutabilityCheckingEnforcement Works
// Before each element is processed by the DoFn:
@Override
public void beforeElement(WindowedValue<T> element) {
// Create a MutationDetector that snapshots the element via its Coder
mutationElements.put(
element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
}
// After each element is processed by the DoFn:
@Override
public void afterElement(WindowedValue<T> element) {
// Verify the element has not been mutated since the snapshot
verifyUnmodified(mutationElements.get(element));
}
// After the entire bundle is processed:
@Override
public void afterFinish(CommittedBundle<T> input, TransformResult<T> result,
Iterable<? extends CommittedBundle<?>> outputs) {
// Final check: verify all elements remain unmodified
for (MutationDetector detector : mutationElements.values()) {
verifyUnmodified(detector);
}
}
Disabling Enforcement for Performance Testing
PipelineOptions options = PipelineOptionsFactory.create();
DirectOptions directOptions = options.as(DirectOptions.class);
// Disable enforcement for performance benchmarking
directOptions.setEnforceEncodability(false);
directOptions.setEnforceImmutability(false);
Pipeline p = Pipeline.create(options);
// ... define pipeline ...
p.run();
Enforcement Decision Flow
The Enforcement enum in DirectRunner determines which PCollections are subject to immutability enforcement:
| Enforcement | Applies To | Condition |
|---|---|---|
ENCODABILITY |
All PCollections | Always applies (if enabled) |
IMMUTABILITY |
PCollections produced by UDF-executing transforms | Producer transform URN must be PAR_DO_TRANSFORM_URN or READ_TRANSFORM_URN, AND the producer must not be inside a Read.Bounded or Read.Unbounded composite
|
Related Pages
- Principle:Apache_Beam_Model_Enforcement -- The principle of enforcing Beam model invariants for correctness validation.
- Environment:Apache_Beam_Java_Build_Environment -- Java build environment with JDK 8+, Gradle, and multi-language toolchain.
Sources
- Repo -- Apache Beam -- Source repository at
runners/direct-java/.
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment