Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam ImmutabilityEnforcementFactory

From Leeroopedia


Field Value
Implementation Name ImmutabilityEnforcementFactory
Overview Concrete tool for enforcing element immutability and encodability invariants during Direct Runner execution.
Source runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
Implements Principle:Apache_Beam_Model_Enforcement
last_updated 2026-02-09 04:00 GMT

Code Reference

Source Location

File Lines Description
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java L42-158 Full class: ModelEnforcementFactory for input immutability checking
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java L43-45 create(): factory method returning singleton instance
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java L48-54 forBundle(): creates enforcement for a specific bundle and consumer
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java L56-60 isReadTransform(): determines if transform is inside a Read composite
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java L107-157 ImmutabilityCheckingEnforcement: inner class with beforeElement/afterElement/afterFinish
runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java L49-end BundleFactory wrapper enforcing output immutability at commit-time
runners/direct-java/src/main/java/org/apache/beam/runners/direct/CloningBundleFactory.java L37-end BundleFactory that clones elements through coder round-trips for encodability enforcement
runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java L73-136 Enforcement enum: configures which enforcements are enabled

Signature

// ImmutabilityEnforcementFactory: enforces input element immutability
class ImmutabilityEnforcementFactory implements ModelEnforcementFactory {

    // Factory method
    public static ModelEnforcementFactory create()

    // Create enforcement for a specific bundle and consumer transform
    @Override
    public <T> ModelEnforcement<T> forBundle(
        CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer)

    // Check if a transform is inside a Read composite (exempt from enforcement)
    static boolean isReadTransform(AppliedPTransform<?, ?, ?> consumer)
}

// ImmutabilityCheckingEnforcement: inner class performing actual checks
private static class ImmutabilityCheckingEnforcement<T> extends AbstractModelEnforcement<T> {

    // Snapshot element before processing
    @Override
    public void beforeElement(WindowedValue<T> element)

    // Verify element unchanged after processing
    @Override
    public void afterElement(WindowedValue<T> element)

    // Final verification after entire bundle is processed
    @Override
    public void afterFinish(
        CommittedBundle<T> input,
        TransformResult<T> result,
        Iterable<? extends CommittedBundle<?>> outputs)
}

// ImmutabilityCheckingBundleFactory: enforces output element immutability
class ImmutabilityCheckingBundleFactory implements BundleFactory {

    public static ImmutabilityCheckingBundleFactory create(
        BundleFactory underlying, DirectGraph graph)
}

// CloningBundleFactory: enforces coder encodability via round-trip cloning
class CloningBundleFactory implements BundleFactory {

    public static CloningBundleFactory create()
}

Import

import org.apache.beam.runners.direct.ImmutabilityEnforcementFactory;
import org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory;
import org.apache.beam.runners.direct.CloningBundleFactory;

I/O Contract

Inputs

Parameter Type Description
graph DirectGraph The execution graph, used by the Enforcement enum to determine which PCollections are subject to immutability enforcement based on their producer transform.
options DirectOptions Configuration flags: isEnforceEncodability() (controls CloningBundleFactory) and isEnforceImmutability() (controls ImmutabilityEnforcementFactory and ImmutabilityCheckingBundleFactory).
input CommittedBundle<T> The bundle of elements to be processed, from which the Coder is extracted for snapshot creation.
consumer AppliedPTransform<?, ?, ?> The transform that will process the bundle, checked against the Read composite exemption.

Outputs

Output Type Description
Configured enforcement factories Map<String, Collection<ModelEnforcementFactory>> A mapping from transform URNs (e.g., PAR_DO_TRANSFORM_URN) to enforcement factories. Returned by Enforcement.defaultModelEnforcements().
Configured bundle factory BundleFactory A bundle factory chain: base ImmutableListBundleFactory, optionally wrapped by CloningBundleFactory (encodability), optionally wrapped by ImmutabilityCheckingBundleFactory (output immutability). Returned by Enforcement.bundleFactoryFor().
On violation IllegalMutationException Thrown when a mutation is detected, with a message identifying the transform, the saved value, and the mutated value.

Usage Examples

Enforcement Configuration in DirectRunner

// In DirectRunner constructor (L151-153)
private DirectRunner(DirectOptions options) {
    this.options = options;
    this.enabledEnforcements = Enforcement.enabled(options);
}

// In DirectRunner.run() (L196-213)
DirectGraph graph = graphVisitor.getGraph();

// Create bundle factory with enforcement wrappers
BundleFactory bundleFactory = Enforcement.bundleFactoryFor(enabledEnforcements, graph);
// If encodability enabled: CloningBundleFactory wraps ImmutableListBundleFactory
// If immutability enabled: ImmutabilityCheckingBundleFactory wraps the above

EvaluationContext context = EvaluationContext.create(
    clockSupplier.get(), bundleFactory, graph,
    keyedPValueVisitor.getKeyedPValues(), metricsPool);

// Create model enforcement factories for ParDo transforms
Map<String, Collection<ModelEnforcementFactory>> enforcements =
    Enforcement.defaultModelEnforcements(enabledEnforcements);
// If immutability enabled: maps PAR_DO_TRANSFORM_URN -> [ImmutabilityEnforcementFactory]

How ImmutabilityCheckingEnforcement Works

// Before each element is processed by the DoFn:
@Override
public void beforeElement(WindowedValue<T> element) {
    // Create a MutationDetector that snapshots the element via its Coder
    mutationElements.put(
        element, MutationDetectors.forValueWithCoder(element.getValue(), coder));
}

// After each element is processed by the DoFn:
@Override
public void afterElement(WindowedValue<T> element) {
    // Verify the element has not been mutated since the snapshot
    verifyUnmodified(mutationElements.get(element));
}

// After the entire bundle is processed:
@Override
public void afterFinish(CommittedBundle<T> input, TransformResult<T> result,
                        Iterable<? extends CommittedBundle<?>> outputs) {
    // Final check: verify all elements remain unmodified
    for (MutationDetector detector : mutationElements.values()) {
        verifyUnmodified(detector);
    }
}

Disabling Enforcement for Performance Testing

PipelineOptions options = PipelineOptionsFactory.create();
DirectOptions directOptions = options.as(DirectOptions.class);

// Disable enforcement for performance benchmarking
directOptions.setEnforceEncodability(false);
directOptions.setEnforceImmutability(false);

Pipeline p = Pipeline.create(options);
// ... define pipeline ...
p.run();

Enforcement Decision Flow

The Enforcement enum in DirectRunner determines which PCollections are subject to immutability enforcement:

Enforcement Applies To Condition
ENCODABILITY All PCollections Always applies (if enabled)
IMMUTABILITY PCollections produced by UDF-executing transforms Producer transform URN must be PAR_DO_TRANSFORM_URN or READ_TRANSFORM_URN, AND the producer must not be inside a Read.Bounded or Read.Unbounded composite

Related Pages

Sources

  • Repo -- Apache Beam -- Source repository at runners/direct-java/.

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment