Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam WindmillStateInternals Persist

From Leeroopedia


Field Value
Implementation Name WindmillStateInternals Persist
Overview Concrete implementation for persisting state mutations and committing work results to Windmill in Dataflow streaming, including counter short ID optimization for metric reporting.
Module runners/google-cloud-dataflow-java/worker
Repository apache/beam
Related Principle Principle:Apache_Beam_Result_Commitment
last_updated 2026-02-09 04:00 GMT

Overview

The WindmillStateInternals.persist() method is the concrete mechanism for serializing all accumulated state mutations into a WorkItemCommitRequest for atomic commitment to Windmill. It coordinates the persistence of primary state and derived state, resolves any pending asynchronous state reads, and updates the state cache for future reuse. The CounterShortIdCache complements this by optimizing metric reporting bandwidth through counter name-to-short-ID mapping.

Code Reference

Source Location

File Lines Description
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java L42 Class declaration: public class WindmillStateInternals<K> implements StateInternals
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java L58-77 Constructor
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java L84-104 persist(List, CachingStateTable) -- internal helper
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java L106-125 persist(WorkItemCommitRequest.Builder) -- public entry point
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java L128-136 state() -- state access methods
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CounterShortIdCache.java L45 Class declaration: public class CounterShortIdCache
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CounterShortIdCache.java L54-85 storeNewShortIds()
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CounterShortIdCache.java L90-97 shortenIdsIfAvailable()

Signature

WindmillStateInternals.persist():

public class WindmillStateInternals<K> implements StateInternals {

    // Public entry point for state persistence.
    // Serializes all state mutations into the commit request builder.
    public void persist(
        final Windmill.WorkItemCommitRequest.Builder commitBuilder) {  // L106-125

        List<Future<WorkItemCommitRequest>> commitsToMerge = new ArrayList<>();

        // Schedule persistence for primary and derived state tables.
        // Some state types may schedule async reads from Windmill.
        persist(commitsToMerge, workItemState);
        persist(commitsToMerge, workItemDerivedState);

        // Open a scoped read state connection and wait for all futures.
        try (Closeable ignored = scopedReadStateSupplier.get()) {
            for (Future<WorkItemCommitRequest> commitFuture : commitsToMerge) {
                commitBuilder.mergeFrom(commitFuture.get());
            }
        } catch (ExecutionException | InterruptedException | IOException exc) {
            if (exc instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(
                "Failed to retrieve Windmill state during persist()", exc);
        }

        // Flush cached values for reuse by future work items on same key.
        cache.persist();
    }

    // Internal helper: schedules persistence for each state location.
    private void persist(
        List<Future<WorkItemCommitRequest>> commitsToMerge,
        CachingStateTable stateTable) {                                // L84-104

        for (WindmillState location : stateTable.values()) {
            try {
                commitsToMerge.add(location.persist(cache));
            } catch (IOException e) {
                throw new RuntimeException("Unable to persist state", e);
            }
        }

        // Clear references to the reader to prevent space leaks.
        for (WindmillState location : stateTable.values()) {
            location.cleanupAfterWorkItem();
        }

        // Clear the map of already retrieved state instances.
        stateTable.clear();
    }
}

CounterShortIdCache:

public class CounterShortIdCache {

    // Store new counter name -> short ID mappings from Dataflow service responses.
    public void storeNewShortIds(
        final ReportWorkItemStatusRequest request,
        final ReportWorkItemStatusResponse reply) {                    // L54-85

        // Validates request/response sizes match
        checkArgument(
            request.getWorkItemStatuses() != null
                && reply.getWorkItemServiceStates() != null
                && request.getWorkItemStatuses().size()
                    == reply.getWorkItemServiceStates().size());

        for (int i = 0; i < request.getWorkItemStatuses().size(); i++) {
            WorkItemServiceState state = reply.getWorkItemServiceStates().get(i);
            WorkItemStatus status = request.getWorkItemStatuses().get(i);
            if (state.getMetricShortId() == null) continue;

            for (MetricShortId shortIdMsg : state.getMetricShortId()) {
                int metricIndex = MoreObjects.firstNonNull(
                    shortIdMsg.getMetricIndex(), 0);
                CounterUpdate update =
                    status.getCounterUpdates().get(metricIndex);
                cache.insert(update, checkNotNull(shortIdMsg.getShortId()));
            }
        }
    }

    // Replace full counter names with short IDs where available.
    public void shortenIdsIfAvailable(
        java.util.@Nullable List<CounterUpdate> counters) {            // L90-97
        if (counters == null) return;
        for (CounterUpdate update : counters) {
            cache.shortenIdsIfAvailable(update);
        }
    }
}

Import

import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.CounterShortIdCache;
import org.apache.beam.runners.core.StateInternals;

I/O Contract

Inputs

Input Type Description
commitBuilder Windmill.WorkItemCommitRequest.Builder The builder into which all state mutations are serialized. Timer mutations and output elements are added separately.
workItemState CachingStateTable Primary state table containing all state objects accessed during work item processing.
workItemDerivedState CachingStateTable Derived state table for compacted or secondary state representations.
request (for CounterShortIdCache) ReportWorkItemStatusRequest The outgoing metric report containing full counter updates.
reply (for CounterShortIdCache) ReportWorkItemStatusResponse The Dataflow service response containing assigned short IDs for counters.

Outputs

Output Type Description
Committed State Mutations Merged into WorkItemCommitRequest.Builder All state writes, clears, appends, and compactions serialized as protobuf messages.
Updated Cache WindmillStateCache Cached state values flushed for reuse by future work items on the same key.
Short ID Mappings CounterShortIdCache internal maps Counter name to short ID mappings cached for bandwidth-efficient future metric reports.

State Persistence Flow

The persistence flow follows a two-phase approach:

Phase 1: Schedule Persistence
  -> Iterate workItemState entries
     -> For each WindmillState: call persist(cache) -> Future<WorkItemCommitRequest>
     -> Add future to commitsToMerge list
  -> Iterate workItemDerivedState entries (same process)
  -> Cleanup: call cleanupAfterWorkItem() on each state location
  -> Clear state tables

Phase 2: Resolve and Merge
  -> Open scoped read state connection
  -> For each future in commitsToMerge:
     -> future.get() may trigger Windmill reads for compaction
     -> commitBuilder.mergeFrom(result)
  -> Close scoped read state connection
  -> cache.persist() flushes to WindmillStateCache

The two-phase design is important because:

  • Phase 1 allows all state objects to schedule their reads concurrently, batching Windmill I/O.
  • Phase 2 resolves all reads within a single scoped connection, ensuring efficient use of the GetData stream.

Stochastic Compaction

WindmillStateInternals includes a stochastic compaction mechanism via the COMPACT_NOW ThreadLocal:

// ShouldCompactNowFn uses geometric distribution to determine when to compact.
// Returns true at an average rate of 0.2% of calls.
private static class ShouldCompactNowFn implements Supplier<Boolean> {
    private static final double RATE = 0.002;
    // Uses geometric distribution: Math.log(random.nextDouble()) / Math.log(1 - RATE)
}

This ensures that state compaction (reducing the size of accumulated bag state or other appendable state) happens periodically without coordinated scheduling, using probabilistic triggering to spread compaction load over time.

Usage Examples

Persisting State After Work Item Processing:

// After processing all elements and timers in a work item:
Windmill.WorkItemCommitRequest.Builder commitBuilder =
    Windmill.WorkItemCommitRequest.newBuilder();

// Persist state mutations
stateInternals.persist(commitBuilder);

// Persist timer mutations (separate from state)
timerInternals.persistTo(commitBuilder);

// Build and submit the commit
WorkItemCommitRequest commitRequest = commitBuilder.build();
workCommitter.commit(Commit.create(commitRequest, computationState, workItem));

Using CounterShortIdCache for Metric Optimization:

CounterShortIdCache shortIdCache = new CounterShortIdCache();

// After sending a status report and receiving the response:
shortIdCache.storeNewShortIds(statusRequest, statusResponse);

// Before sending subsequent reports, shorten counter IDs:
List<CounterUpdate> counters = collectCounterUpdates();
shortIdCache.shortenIdsIfAvailable(counters);
// Counters with known short IDs now have their name/kind cleared
// and shortId set, reducing serialization size.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment