Implementation:Apache Beam WindmillStateInternals Persist
| Field | Value |
|---|---|
| Implementation Name | WindmillStateInternals Persist |
| Overview | Concrete implementation for persisting state mutations and committing work results to Windmill in Dataflow streaming, including counter short ID optimization for metric reporting. |
| Module | runners/google-cloud-dataflow-java/worker |
| Repository | apache/beam |
| Related Principle | Principle:Apache_Beam_Result_Commitment |
| last_updated | 2026-02-09 04:00 GMT |
Overview
The WindmillStateInternals.persist() method is the concrete mechanism for serializing all accumulated state mutations into a WorkItemCommitRequest for atomic commitment to Windmill. It coordinates the persistence of primary state and derived state, resolves any pending asynchronous state reads, and updates the state cache for future reuse. The CounterShortIdCache complements this by optimizing metric reporting bandwidth through counter name-to-short-ID mapping.
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java |
L42 | Class declaration: public class WindmillStateInternals<K> implements StateInternals
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java |
L58-77 | Constructor |
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java |
L84-104 | persist(List, CachingStateTable) -- internal helper
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java |
L106-125 | persist(WorkItemCommitRequest.Builder) -- public entry point
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java |
L128-136 | state() -- state access methods
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CounterShortIdCache.java |
L45 | Class declaration: public class CounterShortIdCache
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CounterShortIdCache.java |
L54-85 | storeNewShortIds()
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/CounterShortIdCache.java |
L90-97 | shortenIdsIfAvailable()
|
Signature
WindmillStateInternals.persist():
public class WindmillStateInternals<K> implements StateInternals {
// Public entry point for state persistence.
// Serializes all state mutations into the commit request builder.
public void persist(
final Windmill.WorkItemCommitRequest.Builder commitBuilder) { // L106-125
List<Future<WorkItemCommitRequest>> commitsToMerge = new ArrayList<>();
// Schedule persistence for primary and derived state tables.
// Some state types may schedule async reads from Windmill.
persist(commitsToMerge, workItemState);
persist(commitsToMerge, workItemDerivedState);
// Open a scoped read state connection and wait for all futures.
try (Closeable ignored = scopedReadStateSupplier.get()) {
for (Future<WorkItemCommitRequest> commitFuture : commitsToMerge) {
commitBuilder.mergeFrom(commitFuture.get());
}
} catch (ExecutionException | InterruptedException | IOException exc) {
if (exc instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(
"Failed to retrieve Windmill state during persist()", exc);
}
// Flush cached values for reuse by future work items on same key.
cache.persist();
}
// Internal helper: schedules persistence for each state location.
private void persist(
List<Future<WorkItemCommitRequest>> commitsToMerge,
CachingStateTable stateTable) { // L84-104
for (WindmillState location : stateTable.values()) {
try {
commitsToMerge.add(location.persist(cache));
} catch (IOException e) {
throw new RuntimeException("Unable to persist state", e);
}
}
// Clear references to the reader to prevent space leaks.
for (WindmillState location : stateTable.values()) {
location.cleanupAfterWorkItem();
}
// Clear the map of already retrieved state instances.
stateTable.clear();
}
}
CounterShortIdCache:
public class CounterShortIdCache {
// Store new counter name -> short ID mappings from Dataflow service responses.
public void storeNewShortIds(
final ReportWorkItemStatusRequest request,
final ReportWorkItemStatusResponse reply) { // L54-85
// Validates request/response sizes match
checkArgument(
request.getWorkItemStatuses() != null
&& reply.getWorkItemServiceStates() != null
&& request.getWorkItemStatuses().size()
== reply.getWorkItemServiceStates().size());
for (int i = 0; i < request.getWorkItemStatuses().size(); i++) {
WorkItemServiceState state = reply.getWorkItemServiceStates().get(i);
WorkItemStatus status = request.getWorkItemStatuses().get(i);
if (state.getMetricShortId() == null) continue;
for (MetricShortId shortIdMsg : state.getMetricShortId()) {
int metricIndex = MoreObjects.firstNonNull(
shortIdMsg.getMetricIndex(), 0);
CounterUpdate update =
status.getCounterUpdates().get(metricIndex);
cache.insert(update, checkNotNull(shortIdMsg.getShortId()));
}
}
}
// Replace full counter names with short IDs where available.
public void shortenIdsIfAvailable(
java.util.@Nullable List<CounterUpdate> counters) { // L90-97
if (counters == null) return;
for (CounterUpdate update : counters) {
cache.shortenIdsIfAvailable(update);
}
}
}
Import
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest;
import org.apache.beam.runners.dataflow.worker.CounterShortIdCache;
import org.apache.beam.runners.core.StateInternals;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
| commitBuilder | Windmill.WorkItemCommitRequest.Builder |
The builder into which all state mutations are serialized. Timer mutations and output elements are added separately. |
| workItemState | CachingStateTable |
Primary state table containing all state objects accessed during work item processing. |
| workItemDerivedState | CachingStateTable |
Derived state table for compacted or secondary state representations. |
| request (for CounterShortIdCache) | ReportWorkItemStatusRequest |
The outgoing metric report containing full counter updates. |
| reply (for CounterShortIdCache) | ReportWorkItemStatusResponse |
The Dataflow service response containing assigned short IDs for counters. |
Outputs
| Output | Type | Description |
|---|---|---|
| Committed State Mutations | Merged into WorkItemCommitRequest.Builder |
All state writes, clears, appends, and compactions serialized as protobuf messages. |
| Updated Cache | WindmillStateCache |
Cached state values flushed for reuse by future work items on the same key. |
| Short ID Mappings | CounterShortIdCache internal maps |
Counter name to short ID mappings cached for bandwidth-efficient future metric reports. |
State Persistence Flow
The persistence flow follows a two-phase approach:
Phase 1: Schedule Persistence
-> Iterate workItemState entries
-> For each WindmillState: call persist(cache) -> Future<WorkItemCommitRequest>
-> Add future to commitsToMerge list
-> Iterate workItemDerivedState entries (same process)
-> Cleanup: call cleanupAfterWorkItem() on each state location
-> Clear state tables
Phase 2: Resolve and Merge
-> Open scoped read state connection
-> For each future in commitsToMerge:
-> future.get() may trigger Windmill reads for compaction
-> commitBuilder.mergeFrom(result)
-> Close scoped read state connection
-> cache.persist() flushes to WindmillStateCache
The two-phase design is important because:
- Phase 1 allows all state objects to schedule their reads concurrently, batching Windmill I/O.
- Phase 2 resolves all reads within a single scoped connection, ensuring efficient use of the GetData stream.
Stochastic Compaction
WindmillStateInternals includes a stochastic compaction mechanism via the COMPACT_NOW ThreadLocal:
// ShouldCompactNowFn uses geometric distribution to determine when to compact.
// Returns true at an average rate of 0.2% of calls.
private static class ShouldCompactNowFn implements Supplier<Boolean> {
private static final double RATE = 0.002;
// Uses geometric distribution: Math.log(random.nextDouble()) / Math.log(1 - RATE)
}
This ensures that state compaction (reducing the size of accumulated bag state or other appendable state) happens periodically without coordinated scheduling, using probabilistic triggering to spread compaction load over time.
Usage Examples
Persisting State After Work Item Processing:
// After processing all elements and timers in a work item:
Windmill.WorkItemCommitRequest.Builder commitBuilder =
Windmill.WorkItemCommitRequest.newBuilder();
// Persist state mutations
stateInternals.persist(commitBuilder);
// Persist timer mutations (separate from state)
timerInternals.persistTo(commitBuilder);
// Build and submit the commit
WorkItemCommitRequest commitRequest = commitBuilder.build();
workCommitter.commit(Commit.create(commitRequest, computationState, workItem));
Using CounterShortIdCache for Metric Optimization:
CounterShortIdCache shortIdCache = new CounterShortIdCache();
// After sending a status report and receiving the response:
shortIdCache.storeNewShortIds(statusRequest, statusResponse);
// Before sending subsequent reports, shorten counter IDs:
List<CounterUpdate> counters = collectCounterUpdates();
shortIdCache.shortenIdsIfAvailable(counters);
// Counters with known short IDs now have their name/kind cleared
// and shortId set, reducing serialization size.
Related Pages
- Principle:Apache_Beam_Result_Commitment -- The principle describing result commitment to Windmill.
- Implementation:Apache_Beam_WindmillTimerInternals -- Timer persistence that complements state persistence in the commit request.
- Implementation:Apache_Beam_StreamingDataflowWorker_Start -- The start method that begins the processing loop that eventually calls persist.
- Environment:Apache_Beam_Dataflow_Streaming_Runtime -- GCP Dataflow streaming worker runtime with Windmill and memory monitoring.