Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:Apache Beam Heartbeat and Refresh

From Leeroopedia


Field Value
Principle Name Heartbeat and Refresh
Domain Distributed_Systems, Monitoring
Overview Mechanism for maintaining worker liveness through periodic heartbeats to Windmill and status reports to the Dataflow service.
Related Implementation Implementation:Apache_Beam_StreamingDataflowWorker_Start
Repository apache/beam
last_updated 2026-02-09 04:00 GMT

Overview

Heartbeat and refresh is the mechanism by which long-running streaming workers maintain their liveness through periodic heartbeats to Windmill and status reports to the Dataflow service. It encompasses several background subsystems that keep the worker healthy, visible to the orchestration layer, and responsive to configuration changes.

Description

Long-running streaming workers must continuously signal their liveness and report metrics to both the Windmill backend and the Dataflow service. The StreamingDataflowWorker.start() method activates a suite of background subsystems that run for the lifetime of the worker:

1. ActiveWorkRefresher -- Windmill Heartbeats: The ActiveWorkRefresher periodically sends heartbeats for all in-progress work items to prevent Windmill from timing them out. Key behaviors:

  • Refresh Period: Configured by activeWorkRefreshPeriodMillis. The refresher runs on a dedicated ScheduledExecutorService.
  • Iteration: On each refresh cycle, the refresher iterates over all computations (via computationStateCache.getAllPresentComputations()) and collects work items that need heartbeating.
  • Heartbeat Sending: Heartbeats are sent through the ThrottlingGetDataMetricTracker.trackHeartbeats() method, which routes them to the appropriate Windmill backend.
  • Stuck Commit Detection: For Streaming Engine mode, if a work item has been waiting for commit acknowledgment longer than stuckCommitDurationMillis, the refresher flags it. This helps detect Windmill backend issues.

2. StreamingWorkerStatusReporter -- Metric Reporting: The StreamingWorkerStatusReporter periodically reports counter updates and metrics to the Dataflow service:

  • Reporting Period: Configured by windmillHarnessUpdateReportingPeriod.
  • Counter Aggregation: Collects counters from all active stages and flattens them into a single report.
  • Per-Worker Metrics: Optionally reports per-worker metrics at a separate interval (perWorkerMetricsUpdateReportingPeriodMillis).
  • Failure Reporting: Includes failure stacktraces (up to MAX_FAILURES_TO_REPORT_IN_UPDATE = 1000) from the FailureTracker.
  • Short ID Caching: Uses the CounterShortIdCache to optimize metric report bandwidth by replacing full counter names with numeric short IDs.

3. ComputationConfig.Fetcher -- Configuration Refresh: The configFetcher.start() call activates the periodic configuration refresh:

  • StreamingEngineComputationConfigFetcher: Schedules a ScheduledExecutorService that periodically calls the Dataflow service to fetch updated global configuration.
  • Initial Block: At startup, the fetcher blocks until the initial global configuration is received.
  • Periodic Fetch: After the initial fetch, it polls at globalConfigRefreshPeriodMillis intervals.
  • Config Propagation: Updated configurations are pushed to the StreamingGlobalConfigHandle, which notifies registered observers (such as the connectivity type switcher that can transition between DirectPath and CloudPath).

4. BackgroundMemoryMonitor -- Memory Pressure Monitoring: The memoryMonitor.start() call starts a background thread that monitors JVM memory usage:

  • Memory Thresholds: When memory usage exceeds configured thresholds, the monitor throttles work acquisition by blocking waitForResources() calls.
  • GC Monitoring: Monitors garbage collection activity and frequency.
  • Heap Dump Triggering: Can trigger heap dumps when memory pressure is severe, for post-mortem analysis.

5. DataflowExecutionStateSampler: The sampler.start() call activates periodic sampling of execution states:

  • Tracks which step/operation each thread is currently executing.
  • Provides data for time-based profiling of pipeline steps.

6. Connectivity Type Switching: After all subsystems are started, the worker registers a config observer that monitors the ConnectivityType in the global configuration. If the connectivity type changes (e.g., from CloudPath to DirectPath), the worker dynamically switches its StreamingWorkerHarness implementation without restarting.

Usage

Understanding heartbeat and refresh is critical for:

  • Diagnosing worker timeouts: If heartbeats are not sent frequently enough, Windmill may reclaim work items, causing duplicate processing or dropped work. Increasing the active work refresh period or investigating heartbeat failures can resolve this.
  • Stale metrics: If the status reporter is not running or is failing, metrics in the Dataflow monitoring UI will be stale. Check the reporter's periodic execution and look for errors in the Dataflow service communication.
  • Memory pressure issues: The memory monitor can throttle work acquisition, causing reduced throughput. Monitoring GC logs and memory usage helps diagnose whether the worker needs more memory or the state cache is too large.
  • Configuration staleness: If the config fetcher fails, workers will not pick up endpoint changes or operational limit updates. This can lead to workers connecting to stale Windmill backends.

The subsystem startup sequence in start() is:

worker.start()
  -> configFetcher.start()            // Periodic config refresh
  -> memoryMonitor.start()            // Background memory pressure monitoring
  -> streamingWorkerHarness.start()   // Activates Windmill streams
  -> sampler.start()                  // Execution state sampling
  -> workerStatusReporter.start()     // Periodic counter/metric reporting
  -> activeWorkRefresher.start()      // Heartbeats for in-progress work
  -> Register connectivity type observer  // Dynamic harness switching

Theoretical Basis

Heartbeat and refresh is based on heartbeat-based failure detection in distributed systems:

  • Failure Detection: In distributed systems, a coordinator (Windmill) must distinguish between a slow worker and a failed worker. Regular liveness signals (heartbeats) allow the coordinator to set timeouts. If no heartbeat is received within the timeout window, the coordinator assumes failure and reassigns the work. The ActiveWorkRefresher implements this pattern.
  • Phi Accrual Failure Detection: While Windmill uses a simpler timeout-based approach, the principle is related to more sophisticated failure detectors like the Phi Accrual Failure Detector, where the probability of failure increases with the time since the last heartbeat.
  • Monitoring and Observability: The StreamingWorkerStatusReporter implements the observability pillar of distributed systems by exporting metrics (counters, throughput, latency) to a central service. This enables dashboards, alerting, and auto-scaling decisions.
  • Self-Healing: The connectivity type switching mechanism implements a form of adaptive reconfiguration. The system can autonomously switch between communication paths (DirectPath vs. CloudPath) in response to signals from the control plane, without requiring worker restart.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment