Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:ArroyoSystems Arroyo Steady State Processing

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Distributed_Systems, Monitoring
Last Updated 2026-02-08 12:00 GMT

Overview

Monitoring and management of a running streaming pipeline during steady state. The controller monitors worker heartbeats, manages periodic checkpointing, handles task failures, tracks operator metrics, and responds to lifecycle events such as rescaling, restarting, and stopping.

Description

Once a streaming pipeline has been scheduled and all tasks are running, it enters steady state. During this phase, the controller runs an event-driven control loop that polls for events at a fixed interval and dispatches to appropriate handlers. The steady state management subsystem is responsible for:

Liveness monitoring: The controller tracks heartbeats from worker processes. If a worker stops sending heartbeats within the configured timeout window, the controller detects the timeout and initiates recovery. This provides crash detection for worker processes that fail silently (e.g., OOM kills, node failures).

Periodic checkpointing: The controller initiates checkpoints at a configurable interval. Each checkpoint triggers a coordinated snapshot across all operators by sending checkpoint barriers through the dataflow graph. The controller tracks checkpoint progress, waits for all operators to complete their snapshots, and records checkpoint metadata in the database. Checkpoints interleave with normal data processing to minimize impact on throughput.

State cleanup and compaction: After checkpoints complete, the controller manages cleanup of old checkpoint data. A background compaction task removes checkpoint files that are no longer needed (i.e., epochs older than the minimum epoch). Cleanup and checkpointing are mutually exclusive to avoid race conditions on state files.

Metrics collection: The controller periodically queries workers for Prometheus-format metrics, aggregating per-subtask counters and gauges for operators. These metrics are exposed for monitoring and used to track pipeline health and throughput.

Event-driven state transitions: The control loop handles several categories of events:

  • Configuration updates -- changes to parallelism overrides trigger rescaling; restart nonce changes trigger restarting; stop mode changes trigger shutdown
  • Task failures -- individual task failures trigger recovery or error state transitions depending on the error domain and severity
  • Source completion -- when all source operators signal completion, the pipeline transitions to a finishing state
  • Errors -- internal errors during steady state trigger recovery with the error context preserved
  • TTL expiration -- if the job has a time-to-live, the pipeline is stopped when the TTL expires

Usage

Steady state processing begins when the Scheduling state successfully starts all tasks and transitions to Running. It continues indefinitely until one of the exit conditions is met: an explicit stop request, a task failure, source completion, a rescaling request, or a fatal error. Most streaming pipelines spend the vast majority of their lifetime in this state.

Theoretical Basis

Steady state management in distributed stream processing follows a control loop pattern that combines event-driven and polling approaches:

Heartbeat-based failure detection: Workers periodically send heartbeats to the controller. The controller maintains a timeout window for each worker. If no heartbeat is received within the window, the worker is considered failed:

for each worker:
    if now - last_heartbeat > timeout:
        declare worker failed
        trigger recovery

This approach trades detection latency for reduced false positives compared to eager failure detection. The timeout must be longer than the maximum expected heartbeat delay under load.

Timer-based checkpoint triggers: Checkpoints are initiated at fixed intervals measured from the completion of the previous checkpoint:

loop:
    if checkpoint_in_progress:
        check if all operators have completed snapshot
        if done: finalize checkpoint, update metadata
    else if time_since_last_checkpoint > checkpoint_interval:
        initiate new checkpoint (send barriers to sources)

This ensures consistent checkpoint spacing regardless of checkpoint duration. The checkpoint interval balances recovery time (shorter intervals mean less data to replay) against checkpointing overhead (each checkpoint adds I/O and coordination cost).

Progress tracking: The controller's progress function implements a priority-ordered check sequence:

  1. Worker liveness (highest priority -- failed workers make other checks meaningless)
  2. Task failures (individual task errors that may not crash the worker)
  3. Source completion (all sources have finished producing data)
  4. Checkpoint management (ongoing checkpoint finalization or new checkpoint initiation)
  5. State cleanup (background compaction of old checkpoint data)
  6. Metrics collection (lowest priority -- informational only)

This ordering ensures that failures are detected before attempting operations that would be wasted on a failing pipeline.

Event-driven state transitions: The control loop uses tokio::select! to multiplex across multiple event sources: the job message channel (for worker messages and configuration updates), a 200-millisecond polling timer (for progress checks), a logging interval timer, and an optional TTL timer. This pattern allows responsive handling of urgent events (task failures, stop requests) while maintaining regular progress monitoring.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment