Implementation:ArroyoSystems Arroyo Recovering State
Recovering State Implementation
Implementation of Recovering::next.
The Recovering struct represents the recovery state in the controller's job state machine. Its next() method orchestrates the transition from a failed state back to a running state by preparing checkpoint data and triggering recompilation and rescheduling.
Code Reference
- File:
crates/arroyo-controller/src/states/recovering.rs(L99-L135)
Core Interface
#[derive(Debug)]
pub struct Recovering {
pub source: anyhow::Error,
pub reason: String,
pub domain: ErrorDomain,
}
impl State for Recovering {
async fn next(self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}
Detailed Behavior
The Recovering Struct
| Field | Type | Description |
|---|---|---|
source |
anyhow::Error |
The original error that triggered recovery |
reason |
String |
Human-readable description of why recovery was triggered |
domain |
ErrorDomain |
Classification of the error (e.g., worker crash, task failure, network error) |
The next Method
This method implements the State trait, defining the recovery logic within the controller's state machine.
Execution steps:
- Log recovery initiation: Records the failure reason and error domain for observability and debugging.
- Identify last checkpoint: Queries the job's checkpoint history to find the most recent successfully completed checkpoint epoch. If no valid checkpoint exists, the job may need to restart from scratch.
- Prepare checkpoint load: Calls
StateBackend::prepare_checkpoint_load()to:- Retrieve per-operator
OperatorCheckpointMetadatafrom object storage. - Validate metadata integrity and completeness.
- Build a loading plan mapping each operator subtask to its Parquet state files.
- Retrieve per-operator
- Transition to Compiling: Returns a
Transitionto theCompiling {}state, which will recompile the dataflow graph if necessary (e.g., if the job definition changed during recovery).
- Scheduling: After compilation, the state machine transitions to
Scheduling, where:- Workers are allocated and deployed.
- Each worker receives its checkpoint loading plan.
- Workers restore state from Parquet files in object storage.
- Processing resumes from the checkpointed positions.
State Machine Transitions
| From State | Transition | To State | Description |
|---|---|---|---|
| Running (failure detected) | Error event | Recovering | Failure triggers recovery |
| Recovering | next() succeeds |
Compiling | Checkpoint prepared, recompile graph |
| Compiling | Compilation complete | Scheduling | Deploy workers with checkpoint data |
| Scheduling | Workers ready | Running | Processing resumes from checkpoint |
| Recovering | next() fails |
Recovering (retry) | Recovery failure triggers another attempt |
Error Handling
If next() encounters an error during recovery (e.g., corrupt checkpoint metadata, unavailable storage), it returns a StateError which may trigger:
- A retry of the recovery process with the same or an earlier checkpoint.
- A transition to a terminal failure state if recovery is impossible (e.g., no valid checkpoints exist).
I/O
- Input:
JobContextwith failed job state and error details (thesource,reason, anddomainfields) - Output: Calls
StateBackend::prepare_checkpoint_load(), then transitions toCompiling {}followed bySchedulingwhere workers restore state from checkpoint
Implements
Principle:ArroyoSystems_Arroyo_State_Restoration Environment:ArroyoSystems_Arroyo_Object_Storage Heuristic:ArroyoSystems_Arroyo_Worker_Heartbeat_Timeout