Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Principle:ArroyoSystems Arroyo Job State Machine

From Leeroopedia


Metadata

Field Value
Page Type Principle
Knowledge Sources Repo (ArroyoSystems/arroyo), Blog (Type-Safe State Machines in Rust)
Domains Stream_Processing, Distributed_Systems, State_Management
Last Updated 2026-02-08

Overview

Finite state machine (FSM) design for managing streaming job lifecycles in the Arroyo engine. Streaming jobs transition through well-defined states (Created, Compiling, Scheduling, Running, CheckpointStopping, Stopping, Stopped, Finishing, Rescaling, Recovering) with type-safe transitions enforced at compile time through Rust's type system. Each state encapsulates its own logic and determines the next valid transition, ensuring that only legal state progressions occur.

Description

The job state machine is the central control mechanism for managing the lifecycle of streaming jobs in Arroyo. Each job is governed by a state machine that enforces deterministic behavior through well-defined states, valid transitions, and state-specific logic. This design eliminates entire classes of bugs related to invalid state transitions, race conditions, and inconsistent lifecycle management.

State Definitions

The job state machine defines the following states:

  • Created: The initial state after a pipeline is submitted. The job record exists but no resources have been allocated.
  • Compiling: The job's SQL program is being validated and prepared for execution. In practice, this is a pass-through state that verifies the job should proceed.
  • Scheduling: The controller is requesting resources from the scheduler and deploying operator instances to worker nodes.
  • Running: The job is actively processing data. Operators are consuming from sources, applying transformations, and writing to sinks. Checkpoints are being taken at the configured interval.
  • CheckpointStopping: A graceful stop has been requested. The system is completing an in-progress checkpoint before stopping, ensuring consistent state for later resumption.
  • Stopping: The job is in the process of shutting down. Operators are being drained and resources are being released.
  • Stopped: The job has been cleanly stopped. State has been checkpointed and the job can be resumed from its last checkpoint.
  • Finishing: The job is completing a finite source (e.g., processing a bounded dataset). All remaining data is being flushed through the pipeline.
  • Rescaling: The job is being rescaled to a different parallelism level. This involves checkpointing, stopping, redistributing state, and restarting with the new configuration.
  • Recovering: The job is recovering from a failure. The recovery process restores state from the last successful checkpoint and resumes processing.

State Transition Diagram

The valid state transitions form a directed graph:

Created --> Compiling --> Scheduling --> Running --> CheckpointStopping --> Stopping --> Stopped
                                          |                                  ^
                                          +--> Finishing                     |
                                          |                                  |
                                          +--> Rescaling --> Compiling       |
                                          |                                  |
                                          +--> Recovering --> Compiling -----+

Key transition paths:

  • Normal lifecycle: Created -> Compiling -> Scheduling -> Running -> CheckpointStopping -> Stopping -> Stopped
  • Failure recovery: Running -> Recovering -> Compiling -> Scheduling -> Running
  • Rescaling: Running -> Rescaling -> Compiling -> Scheduling -> Running
  • Finite source completion: Running -> Finishing -> Stopping -> Stopped

Type-Safe Transitions

Arroyo enforces valid state transitions at compile time using Rust's type system. The key mechanism is a sealed TransitionTo trait:

  • Each state is a distinct Rust struct implementing the State trait.
  • The TransitionTo<T> trait is implemented only for valid target states. For example, impl TransitionTo<Scheduling> for Compiling allows the Compiling state to transition to Scheduling, but no implementation exists for TransitionTo<Stopped> for Compiling, making that transition a compile-time error.
  • The Transition::next(from, to) function requires from: TransitionTo<T>, ensuring that only declared transitions can be constructed.

This approach catches invalid transitions during compilation rather than at runtime, providing strong guarantees about the correctness of the state machine.

State-Encapsulated Logic

Each state implements the State trait with an async next method that contains the state's logic:

  • The next method receives a mutable reference to the JobContext, which provides access to the job's configuration, compiled program, controller client, and scheduler.
  • Each state performs its work (e.g., scheduling operators, monitoring execution, coordinating checkpoints) and returns a Transition to the next state.
  • If an error occurs, the state returns a StateError that the state machine driver uses to determine recovery behavior.

Usage

The job state machine is used in the following contexts:

  • Job startup: When a new pipeline is created, a job enters the Created state and progresses through Compiling and Scheduling to Running.
  • Graceful shutdown: When a user stops a pipeline, the job transitions from Running through CheckpointStopping and Stopping to Stopped.
  • Failure recovery: When a worker node fails or an operator encounters an unrecoverable error, the job enters the Recovering state and restarts from the last checkpoint.
  • Rescaling: When a user changes the parallelism of a running pipeline, the job enters the Rescaling state, checkpoints, and restarts with redistributed state.
  • Controller restart: When the Arroyo controller process restarts, it reconstructs the state machines for all active jobs from their persisted state and resumes management.

Theoretical Basis

Finite State Machines (FSMs)

A finite state machine is a computational model consisting of a finite set of states, a set of inputs (events), a transition function mapping (state, input) pairs to new states, an initial state, and a set of accepting (final) states. FSMs provide several guarantees critical for lifecycle management:

  • Determinism: For any given state and input, the next state is uniquely determined. This eliminates ambiguity in how the system responds to events.
  • Completeness: The transition function is defined for all valid (state, input) combinations, ensuring no unhandled cases.
  • Verifiability: The finite nature of the state space enables exhaustive analysis of all possible state sequences.

Type-State Pattern in Rust

The type-state pattern encodes state machine states as distinct types in the language's type system. Transitions between states are enforced by the type checker rather than runtime assertions. Benefits include:

  • Compile-time safety: Invalid transitions are caught during compilation, not at runtime. This eliminates an entire class of bugs where a system attempts an illegal state transition.
  • Self-documenting: The type signatures of transition functions explicitly declare which transitions are valid.
  • Zero runtime cost: Type-state checks are erased at compilation; the runtime representation is identical to a non-type-checked implementation.

The sealed trait pattern (where the TransitionTo trait can only be implemented within the defining module) prevents external code from declaring new transitions, maintaining the integrity of the state machine specification.

Distributed Systems Considerations

In a distributed streaming system, the job state machine must handle additional complexities:

  • Partial failures: Individual operators or workers may fail while others continue. The state machine must coordinate recovery across all affected components.
  • Checkpoint coordination: State transitions that involve checkpointing (e.g., CheckpointStopping, Rescaling) require distributed coordination using barrier-based protocols similar to Chandy-Lamport snapshots.
  • Concurrent state transitions: The state machine must prevent concurrent transitions -- for example, a user-initiated stop and an automatic recovery cannot occur simultaneously. This is enforced by the single-threaded state machine driver in the controller.
  • Idempotent transitions: Recovery from controller failures requires that state transitions be idempotent -- re-applying a transition from the same starting state produces the same result.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment