Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Beam WindmillTimerInternals

From Leeroopedia


Field Value
Implementation Name WindmillTimerInternals
Overview Concrete implementation for managing Windmill-backed timers during streaming work item processing in Dataflow, including timer set/delete tracking and persistence to commit requests.
Module runners/google-cloud-dataflow-java/worker
Repository apache/beam
Related Principle Principle:Apache_Beam_Work_Item_Processing
last_updated 2026-02-09 04:00 GMT

Overview

WindmillTimerInternals is the TimerInternals implementation for Dataflow Streaming. It manages timer state during work item processing by tracking timer set and delete operations in an in-memory map, providing watermark access, and serializing all timer mutations into WorkItemCommitRequest messages for atomic commitment to Windmill. It works alongside WindmillStateInternals for state access and UserParDoFnFactory for DoFn instantiation to form the complete work item processing infrastructure.

Code Reference

Source Location

File Lines Description
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java L48 Class declaration: class WindmillTimerInternals implements TimerInternals
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java L66-79 Constructor
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java L87-93 setTimer(TimerData)
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java L125-131 deleteTimer(TimerData)
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java L191-245 persistTo(WorkItemCommitRequest.Builder)
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java L173-175 currentInputWatermarkTime()
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java L42 Class declaration: public class WindmillStateInternals<K> implements StateInternals
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java L58-77 Constructor
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java L106-125 persist(WorkItemCommitRequest.Builder)
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternals.java L128-136 state(StateNamespace, StateTag<T>)
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java L53-159 UserParDoFnFactory -- user DoFn instantiation

Signature

WindmillTimerInternals:

class WindmillTimerInternals implements TimerInternals {

    public WindmillTimerInternals(
        String stateFamily,             // unique identifier for a step
        WindmillNamespacePrefix prefix, // "/u" for user, "/s" for system
        Instant processingTime,
        Watermarks watermarks,
        WindmillTagEncoding windmillTagEncoding,
        Consumer<TimerData> onTimerModified)

    // Create a copy with a different namespace prefix
    public WindmillTimerInternals withPrefix(WindmillNamespacePrefix prefix)

    // Register a new timer (tracked in timerMap as set=true)
    @Override
    public void setTimer(TimerData timerKey)                          // L87-93

    // Overloaded: creates TimerData from components, then calls setTimer(TimerData)
    @Override
    public void setTimer(StateNamespace namespace, String timerId,
        String timerFamilyId, Instant timestamp,
        Instant outputTimestamp, TimeDomain timeDomain)               // L96-113

    // Mark a timer for deletion (tracked in timerMap as set=false)
    @Override
    public void deleteTimer(TimerData timerKey)                       // L125-131

    // Delete timer by namespace, ID, family, and time domain
    @Override
    public void deleteTimer(StateNamespace namespace, String timerId,
        String timerFamilyId, TimeDomain timeDomain)                  // L139-150

    // Returns max(processingTime, now)
    @Override
    public Instant currentProcessingTime()                            // L153-156

    // Returns synchronized processing time from Windmill
    @Override
    public @Nullable Instant currentSynchronizedProcessingTime()      // L159-161

    // Returns the input watermark from the GetWork response
    @Override
    public Instant currentInputWatermarkTime()                        // L173-175

    // Returns the output watermark from the GetWork response
    @Override
    public @Nullable Instant currentOutputWatermarkTime()             // L187-189

    // Serialize all timer mutations into the commit request
    public void persistTo(
        Windmill.WorkItemCommitRequest.Builder outputBuilder)         // L191-245

    // Check if a timer tag is a system timer
    public static boolean isSystemTimer(Windmill.Timer timer)         // L253-255

    // Check if a timer tag is a user timer
    public static boolean isUserTimer(Windmill.Timer timer)           // L257-259
}

WindmillStateInternals:

public class WindmillStateInternals<K> implements StateInternals {

    public WindmillStateInternals(
        @Nullable K key,
        String stateFamily,
        WindmillStateReader reader,
        boolean isNewKey,
        WindmillStateCache.ForKeyAndFamily cache,
        WindmillTagEncoding windmillTagEncoding,
        Supplier<Closeable> scopedReadStateSupplier)                  // L58-77

    // Persist all state mutations into the commit request
    public void persist(
        Windmill.WorkItemCommitRequest.Builder commitBuilder)         // L106-125

    // Access state by namespace and tag
    @Override
    public <T extends State> T state(
        StateNamespace namespace, StateTag<T> address)                // L128-130
}

UserParDoFnFactory:

class UserParDoFnFactory implements ParDoFnFactory {

    static UserParDoFnFactory createDefault()                         // L54-56

    @Override
    public ParDoFn create(
        PipelineOptions options, CloudObject cloudUserFn,
        @Nullable List<SideInputInfo> sideInputInfos,
        TupleTag<?> mainOutputTag,
        Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
        DataflowExecutionContext<?> executionContext,
        DataflowOperationContext operationContext) throws Exception    // L84-159
}

Import

import org.apache.beam.runners.dataflow.worker.WindmillTimerInternals;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateInternals;
import org.apache.beam.runners.dataflow.worker.UserParDoFnFactory;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.StateInternals;

I/O Contract

Inputs

Input Type Description
stateFamily String Identifies the step within the computation; used as the Windmill state family for timer storage.
prefix WindmillNamespacePrefix Either /u (user namespace) or /s (system namespace), partitioning timer namespaces.
processingTime Instant The processing time reported by Windmill in the GetWork response.
watermarks Watermarks Contains input data watermark, output data watermark, and synchronized processing time from the GetWork response.
windmillTagEncoding WindmillTagEncoding Encoder/decoder for Windmill timer tags (V1 or V2 encoding).
onTimerModified Consumer<TimerData> Callback invoked whenever a timer is set or deleted, for external tracking.
WorkItem (for WindmillStateInternals) Windmill WorkItem Contains key, input data, timer firings, and state references from Windmill.

Outputs

Output Type Description
Timer Mutations Serialized into WorkItemCommitRequest All timer sets and deletes, including watermark holds for user timers.
State Mutations Serialized into WorkItemCommitRequest All state writes, clears, and appends accumulated during processing (via WindmillStateInternals).
Output Elements Collected by output receivers Elements produced by the DoFn during processing.

Timer Persistence Details

The persistTo() method serializes the in-memory timer map into the commit request:

public void persistTo(Windmill.WorkItemCommitRequest.Builder outputBuilder) {
    for (Entry<TimerData, Boolean> value : timerMap.values()) {
        TimerData timerData = value.getKey();
        Timer.Builder timer = windmillTagEncoding.buildWindmillTimerFromTimerData(
            stateFamily, prefix, timerData,
            outputBuilder.addOutputTimersBuilder());

        if (value.getValue()) {
            // Timer is being SET
            if (needsWatermarkHold(timerData)) {
                if (timerData.getOutputTimestamp()
                    .isBefore(GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1)))) {
                    // Set watermark hold to timer's output timestamp
                    outputBuilder.addWatermarkHoldsBuilder()
                        .setTag(windmillTagEncoding.timerHoldTag(prefix, timerData, timer.getTag()))
                        .setStateFamily(stateFamily)
                        .setReset(true)
                        .addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(
                            timerData.getOutputTimestamp()));
                } else {
                    // Clear hold for timers at end of global window
                    outputBuilder.addWatermarkHoldsBuilder()
                        .setTag(...)
                        .setStateFamily(stateFamily)
                        .setReset(true);
                }
            }
        } else {
            // Timer is being DELETED - clear timestamp and hold
            timer.clearTimestamp();
            timer.clearMetadataTimestamp();
            if (needsWatermarkHold(timerData)) {
                outputBuilder.addWatermarkHoldsBuilder()
                    .setTag(...)
                    .setStateFamily(stateFamily)
                    .setReset(true);
            }
        }
    }
    timerMap.clear();  // Wipe unpersisted state
}

Watermark holds are needed for:

  • User timers (prefix /u): Always need watermark holds to prevent the output watermark from advancing past the timer's output timestamp.
  • System timers with differing timestamps (prefix /s): Only need holds when the timer's output timestamp differs from its firing timestamp.

Usage Examples

Creating WindmillTimerInternals for a Work Item:

WindmillTimerInternals timerInternals = new WindmillTimerInternals(
    stateFamily,
    WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
    workItem.getProcessingTime(),
    Watermarks.builder()
        .setInputDataWatermark(inputWatermark)
        .setOutputDataWatermark(outputWatermark)
        .setSynchronizedProcessingTime(syncProcessingTime)
        .build(),
    windmillTagEncoding,
    timerData -> { /* track modified timers */ });

// After processing, persist all timer mutations
timerInternals.persistTo(commitRequestBuilder);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment