Implementation:Apache Beam WindmillTimerInternals
| Field | Value |
|---|---|
| Implementation Name | WindmillTimerInternals |
| Overview | Concrete implementation for managing Windmill-backed timers during streaming work item processing in Dataflow, including timer set/delete tracking and persistence to commit requests. |
| Module | runners/google-cloud-dataflow-java/worker |
| Repository | apache/beam |
| Related Principle | Principle:Apache_Beam_Work_Item_Processing |
| last_updated | 2026-02-09 04:00 GMT |
Overview
WindmillTimerInternals is the TimerInternals implementation for Dataflow Streaming. It manages timer state during work item processing by tracking timer set and delete operations in an in-memory map, providing watermark access, and serializing all timer mutations into WorkItemCommitRequest messages for atomic commitment to Windmill. It works alongside WindmillStateInternals for state access and UserParDoFnFactory for DoFn instantiation to form the complete work item processing infrastructure.
Code Reference
Source Location
| File | Lines | Description |
|---|---|---|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java |
L48 | Class declaration: class WindmillTimerInternals implements TimerInternals
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java |
L66-79 | Constructor |
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java |
L87-93 | setTimer(TimerData)
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java |
L125-131 | deleteTimer(TimerData)
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java |
L191-245 | persistTo(WorkItemCommitRequest.Builder)
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java |
L173-175 | currentInputWatermarkTime()
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java |
L42 | Class declaration: public class WindmillStateInternals<K> implements StateInternals
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java |
L58-77 | Constructor |
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java |
L106-125 | persist(WorkItemCommitRequest.Builder)
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java |
L128-136 | state(StateNamespace, StateTag<T>)
|
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java |
L53-159 | UserParDoFnFactory -- user DoFn instantiation
|
Signature
WindmillTimerInternals:
class WindmillTimerInternals implements TimerInternals {
public WindmillTimerInternals(
String stateFamily, // unique identifier for a step
WindmillNamespacePrefix prefix, // "/u" for user, "/s" for system
Instant processingTime,
Watermarks watermarks,
WindmillTagEncoding windmillTagEncoding,
Consumer<TimerData> onTimerModified)
// Create a copy with a different namespace prefix
public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix)
// Register a new timer (tracked in timerMap as set=true)
@Override
public void setTimer(TimerData timerKey) // L87-93
// Overloaded: creates TimerData from components, then calls setTimer(TimerData)
@Override
public void setTimer(StateNamespace namespace, String timerId,
String timerFamilyId, Instant timestamp,
Instant outputTimestamp, TimeDomain timeDomain) // L96-113
// Mark a timer for deletion (tracked in timerMap as set=false)
@Override
public void deleteTimer(TimerData timerKey) // L125-131
// Delete timer by namespace, ID, family, and time domain
@Override
public void deleteTimer(StateNamespace namespace, String timerId,
String timerFamilyId, TimeDomain timeDomain) // L139-150
// Returns max(processingTime, now)
@Override
public Instant currentProcessingTime() // L153-156
// Returns synchronized processing time from Windmill
@Override
public @Nullable Instant currentSynchronizedProcessingTime() // L159-161
// Returns the input watermark from the GetWork response
@Override
public Instant currentInputWatermarkTime() // L173-175
// Returns the output watermark from the GetWork response
@Override
public @Nullable Instant currentOutputWatermarkTime() // L187-189
// Serialize all timer mutations into the commit request
public void persistTo(
Windmill.WorkItemCommitRequest.Builder outputBuilder) // L191-245
// Check if a timer tag is a system timer
public static boolean isSystemTimer(Windmill.Timer timer) // L253-255
// Check if a timer tag is a user timer
public static boolean isUserTimer(Windmill.Timer timer) // L257-259
}
WindmillStateInternals:
public class WindmillStateInternals<K> implements StateInternals {
public WindmillStateInternals(
@Nullable K key,
String stateFamily,
WindmillStateReader reader,
boolean isNewKey,
WindmillStateCache.ForKeyAndFamily cache,
WindmillTagEncoding windmillTagEncoding,
Supplier<Closeable> scopedReadStateSupplier) // L58-77
// Persist all state mutations into the commit request
public void persist(
Windmill.WorkItemCommitRequest.Builder commitBuilder) // L106-125
// Access state by namespace and tag
@Override
public <T extends State> T state(
StateNamespace namespace, StateTag<T> address) // L128-130
}
UserParDoFnFactory:
class UserParDoFnFactory implements ParDoFnFactory {
static UserParDoFnFactory createDefault() // L54-56
@Override
public ParDoFn create(
PipelineOptions options, CloudObject cloudUserFn,
@Nullable List<SideInputInfo> sideInputInfos,
TupleTag<?> mainOutputTag,
Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
DataflowExecutionContext<?> executionContext,
DataflowOperationContext operationContext) throws Exception // L84-159
}
Import
import org.apache.beam.runners.dataflow.worker.WindmillTimerInternals;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
import org.apache.beam.runners.dataflow.worker.UserParDoFnFactory;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.StateInternals;
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
| stateFamily | String |
Identifies the step within the computation; used as the Windmill state family for timer storage. |
| prefix | WindmillNamespacePrefix |
Either /u (user namespace) or /s (system namespace), partitioning timer namespaces.
|
| processingTime | Instant |
The processing time reported by Windmill in the GetWork response. |
| watermarks | Watermarks |
Contains input data watermark, output data watermark, and synchronized processing time from the GetWork response. |
| windmillTagEncoding | WindmillTagEncoding |
Encoder/decoder for Windmill timer tags (V1 or V2 encoding). |
| onTimerModified | Consumer<TimerData> |
Callback invoked whenever a timer is set or deleted, for external tracking. |
| WorkItem (for WindmillStateInternals) | Windmill WorkItem |
Contains key, input data, timer firings, and state references from Windmill. |
Outputs
| Output | Type | Description |
|---|---|---|
| Timer Mutations | Serialized into WorkItemCommitRequest |
All timer sets and deletes, including watermark holds for user timers. |
| State Mutations | Serialized into WorkItemCommitRequest |
All state writes, clears, and appends accumulated during processing (via WindmillStateInternals). |
| Output Elements | Collected by output receivers | Elements produced by the DoFn during processing. |
Timer Persistence Details
The persistTo() method serializes the in-memory timer map into the commit request:
public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
for (Entry<TimerData, Boolean> value : timerMap.values()) {
TimerData timerData = value.getKey();
Timer.Builder timer = windmillTagEncoding.buildWindmillTimerFromTimerData(
stateFamily, prefix, timerData,
outputBuilder.addOutputTimersBuilder());
if (value.getValue()) {
// Timer is being SET
if (needsWatermarkHold(timerData)) {
if (timerData.getOutputTimestamp()
.isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)))) {
// Set watermark hold to timer's output timestamp
outputBuilder.addWatermarkHoldsBuilder()
.setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
.setStateFamily(stateFamily)
.setReset(true)
.addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(
timerData.getOutputTimestamp()));
} else {
// Clear hold for timers at end of global window
outputBuilder.addWatermarkHoldsBuilder()
.setTag(...)
.setStateFamily(stateFamily)
.setReset(true);
}
}
} else {
// Timer is being DELETED - clear timestamp and hold
timer.clearTimestamp();
timer.clearMetadataTimestamp();
if (needsWatermarkHold(timerData)) {
outputBuilder.addWatermarkHoldsBuilder()
.setTag(...)
.setStateFamily(stateFamily)
.setReset(true);
}
}
}
timerMap.clear(); // Wipe unpersisted state
}
Watermark holds are needed for:
- User timers (prefix
/u): Always need watermark holds to prevent the output watermark from advancing past the timer's output timestamp. - System timers with differing timestamps (prefix
/s): Only need holds when the timer's output timestamp differs from its firing timestamp.
Usage Examples
Creating WindmillTimerInternals for a Work Item:
WindmillTimerInternals timerInternals = new WindmillTimerInternals(
stateFamily,
WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
workItem.getProcessingTime(),
Watermarks.builder()
.setInputDataWatermark(inputWatermark)
.setOutputDataWatermark(outputWatermark)
.setSynchronizedProcessingTime(syncProcessingTime)
.build(),
windmillTagEncoding,
timerData -> { /* track modified timers */ });
// After processing, persist all timer mutations
timerInternals.persistTo(commitRequestBuilder);
Related Pages
- Principle:Apache_Beam_Work_Item_Processing -- The principle describing work item processing with timers and state.
- Implementation:Apache_Beam_WindmillStateInternals_Persist -- State persistence that complements timer persistence in the commit request.
- Implementation:Apache_Beam_StreamingEngineComputationConfigFetcher -- Provides computation configuration including state families used by timers.
- Environment:Apache_Beam_Dataflow_Streaming_Runtime -- GCP Dataflow streaming worker runtime with Windmill and memory monitoring.