Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Scheduling State

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Distributed_Systems, Scheduling
Last Updated 2026-02-08 12:00 GMT

Overview

Implements the Scheduling controller state, which manages the full lifecycle of starting workers, establishing gRPC connections, computing task assignments, restoring checkpoint state, and launching pipeline execution. This state bridges between pipeline compilation and steady-state running.

Description

The scheduling implementation consists of three primary components within scheduling.rs:

Scheduling::next (lines 256-723) is the main state machine entry point, implementing the State trait. It orchestrates the full scheduling workflow:

  1. Checks if the job should actually be running (via stop_if_desired_non_running! macro)
  2. Clears any existing workers from a previous run for this job
  3. Updates operator parallelism from configuration overrides
  4. Computes the required number of slots as the maximum parallelism across all graph nodes
  5. Calls start_workers to request worker processes from the scheduler backend
  6. Waits for workers to connect via JobMessage::WorkerConnect messages, establishing gRPC channels to each
  7. Loads the last successful checkpoint from the database (if one exists and passes epoch thresholds)
  8. Marks in-progress checkpoints as failed in the database
  9. Loads checkpoint metadata and prepares the state backend for restoration
  10. Handles committing state from checkpoints that were mid-commit at failure time
  11. Computes task assignments using compute_assignments
  12. Sends StartExecutionReq to all workers via gRPC, including the program, assignments, and restore epoch
  13. Waits for all tasks to report as started via JobMessage::TaskStarted messages
  14. Creates a JobController and transitions to the Running state

Scheduling::start_workers (lines 192-247) is a method that requests worker processes from the scheduler backend. It constructs a StartPipelineReq with the program, job ID, run ID, slot count, and environment variables, then calls the scheduler. If the scheduler reports insufficient slots, it retries in a loop with one-second intervals until either enough slots are available or the worker_startup_time timeout expires. Transient scheduler errors are returned as retryable errors.

compute_assignments (lines 67-93) is a free function that maps subtasks to workers using a slot-filling strategy. It iterates through each node in the logical program graph and assigns its subtasks sequentially to workers, advancing to the next worker when the current worker's slots are full. Each assignment records the node ID, subtask index, worker ID, and worker data address.

handle_worker_connect (lines 95-189) processes WorkerConnect messages from workers. It validates the run ID matches the current run, records the worker's metadata (ID, machine ID, data address, slot count), and spawns an async task to establish a gRPC channel to the worker with retry logic (up to 3 attempts with exponential backoff).

Supporting types include:

  • WorkerStatus -- tracks worker state through Connected, Initializing, Ready, and Failed phases
  • WorkerState enum -- the four lifecycle states of a worker during scheduling
  • slots_for_job -- computes required slots as the maximum parallelism across all nodes

Usage

The Scheduling state is entered from various predecessor states (Compiling, Recovering, Rescaling, Restarting). It is a transient state that always transitions to either Running (on success), Stopped (if the job is no longer desired to be running), or retries itself with backoff (on transient failures). Fatal errors such as insufficient scheduler resources cause a transition to a terminal error state.

Code Reference

Source Location

  • Repository: ArroyoSystems_Arroyo
  • File: crates/arroyo-controller/src/states/scheduling.rs (lines 67-723)

Signature

#[derive(Debug)]
pub struct Scheduling {}

#[async_trait::async_trait]
impl State for Scheduling {
    fn name(&self) -> &'static str {
        "Scheduling"
    }

    async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}

impl Scheduling {
    async fn start_workers<'a>(
        self: Box<Self>,
        ctx: &mut JobContext<'a>,
        slots_needed: usize,
    ) -> Result<Box<Self>, StateError>
}

fn compute_assignments(
    workers: Vec<&WorkerStatus>,
    program: &LogicalProgram,
) -> Vec<TaskAssignment>

async fn handle_worker_connect<'a>(
    msg: JobMessage,
    workers: &mut HashMap<WorkerId, WorkerStatus>,
    worker_connects: Arc<Mutex<HashMap<WorkerId, WorkerGrpcClient<Channel>>>>,
    handles: &mut Vec<JoinHandle<()>>,
    ctx: &mut JobContext<'a>,
) -> Result<(), StateError>

fn slots_for_job(job: &LogicalProgram) -> usize

Import

use arroyo_rpc::grpc::rpc::{
    StartExecutionReq, TaskAssignment, worker_grpc_client::WorkerGrpcClient,
};
use arroyo_types::{MachineId, WorkerId};
use arroyo_datastream::logical::LogicalProgram;
use crate::states::{JobContext, State, Transition, StateError};
use crate::schedulers::StartPipelineReq;
use crate::job_controller::JobController;

I/O Contract

Inputs

Name Type Required Description
ctx &mut JobContext Yes Job context providing access to the program, configuration, scheduler, database, message channel, and mutable status
ctx.program LogicalProgram Yes The logical dataflow graph with operator nodes and their parallelism levels
ctx.scheduler Arc<dyn Scheduler> Yes The scheduler backend used to start and stop worker processes
ctx.config JobConfig Yes Job configuration including ID, pipeline name, parallelism overrides, TTL, and ignore_state_before_epoch threshold
ctx.rx Receiver<JobMessage> Yes Channel receiving worker connect, task started, task failed, and config update messages
ctx.db DatabaseSource Yes Database connection for querying checkpoint history and marking failed checkpoints

Outputs

Name Type Description
(return) Result<Transition, StateError> On success: Transition::next(Scheduling, Running {}). On retryable failure: StateError with retry count. On fatal failure: StateError (fatal).
ctx.job_controller Option<JobController> Set to Some(controller) with worker gRPC clients, checkpoint epoch, program, and metrics
ctx.status.tasks Option<i32> Set to the total task count of the program
ctx.metrics RwLock<HashMap<...>> Updated with a new JobMetrics instance for this job
Side effect Worker processes Worker processes are started via the scheduler backend
Side effect gRPC calls StartExecutionReq sent to all workers with program, assignments, and restore epoch

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment