Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Recovering State

From Leeroopedia


Template:Implementation

Recovering State Implementation

Implementation of Recovering::next.

The Recovering struct represents the recovery state in the controller's job state machine. Its next() method orchestrates the transition from a failed state back to a running state by preparing checkpoint data and triggering recompilation and rescheduling.

Code Reference

  • File: crates/arroyo-controller/src/states/recovering.rs (L99-L135)

Core Interface

#[derive(Debug)]
pub struct Recovering {
    pub source: anyhow::Error,
    pub reason: String,
    pub domain: ErrorDomain,
}

impl State for Recovering {
    async fn next(self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}

Detailed Behavior

The Recovering Struct

Field Type Description
source anyhow::Error The original error that triggered recovery
reason String Human-readable description of why recovery was triggered
domain ErrorDomain Classification of the error (e.g., worker crash, task failure, network error)

The next Method

This method implements the State trait, defining the recovery logic within the controller's state machine.

Execution steps:

  1. Log recovery initiation: Records the failure reason and error domain for observability and debugging.
  1. Identify last checkpoint: Queries the job's checkpoint history to find the most recent successfully completed checkpoint epoch. If no valid checkpoint exists, the job may need to restart from scratch.
  1. Prepare checkpoint load: Calls StateBackend::prepare_checkpoint_load() to:
    • Retrieve per-operator OperatorCheckpointMetadata from object storage.
    • Validate metadata integrity and completeness.
    • Build a loading plan mapping each operator subtask to its Parquet state files.
  1. Transition to Compiling: Returns a Transition to the Compiling {} state, which will recompile the dataflow graph if necessary (e.g., if the job definition changed during recovery).
  1. Scheduling: After compilation, the state machine transitions to Scheduling, where:
    • Workers are allocated and deployed.
    • Each worker receives its checkpoint loading plan.
    • Workers restore state from Parquet files in object storage.
    • Processing resumes from the checkpointed positions.

State Machine Transitions

From State Transition To State Description
Running (failure detected) Error event Recovering Failure triggers recovery
Recovering next() succeeds Compiling Checkpoint prepared, recompile graph
Compiling Compilation complete Scheduling Deploy workers with checkpoint data
Scheduling Workers ready Running Processing resumes from checkpoint
Recovering next() fails Recovering (retry) Recovery failure triggers another attempt

Error Handling

If next() encounters an error during recovery (e.g., corrupt checkpoint metadata, unavailable storage), it returns a StateError which may trigger:

  • A retry of the recovery process with the same or an earlier checkpoint.
  • A transition to a terminal failure state if recovery is impossible (e.g., no valid checkpoints exist).

I/O

  • Input: JobContext with failed job state and error details (the source, reason, and domain fields)
  • Output: Calls StateBackend::prepare_checkpoint_load(), then transitions to Compiling {} followed by Scheduling where workers restore state from checkpoint

Implements

Principle:ArroyoSystems_Arroyo_State_Restoration Environment:ArroyoSystems_Arroyo_Object_Storage Heuristic:ArroyoSystems_Arroyo_Worker_Heartbeat_Timeout

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment