Implementation:ArroyoSystems Arroyo Scheduling State
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Distributed_Systems, Scheduling |
| Last Updated | 2026-02-08 12:00 GMT |
Overview
Implements the Scheduling controller state, which manages the full lifecycle of starting workers, establishing gRPC connections, computing task assignments, restoring checkpoint state, and launching pipeline execution. This state bridges between pipeline compilation and steady-state running.
Description
The scheduling implementation consists of three primary components within scheduling.rs:
Scheduling::next (lines 256-723) is the main state machine entry point, implementing the State trait. It orchestrates the full scheduling workflow:
- Checks if the job should actually be running (via
stop_if_desired_non_running!macro) - Clears any existing workers from a previous run for this job
- Updates operator parallelism from configuration overrides
- Computes the required number of slots as the maximum parallelism across all graph nodes
- Calls
start_workersto request worker processes from the scheduler backend - Waits for workers to connect via
JobMessage::WorkerConnectmessages, establishing gRPC channels to each - Loads the last successful checkpoint from the database (if one exists and passes epoch thresholds)
- Marks in-progress checkpoints as failed in the database
- Loads checkpoint metadata and prepares the state backend for restoration
- Handles committing state from checkpoints that were mid-commit at failure time
- Computes task assignments using
compute_assignments - Sends
StartExecutionReqto all workers via gRPC, including the program, assignments, and restore epoch - Waits for all tasks to report as started via
JobMessage::TaskStartedmessages - Creates a
JobControllerand transitions to theRunningstate
Scheduling::start_workers (lines 192-247) is a method that requests worker processes from the scheduler backend. It constructs a StartPipelineReq with the program, job ID, run ID, slot count, and environment variables, then calls the scheduler. If the scheduler reports insufficient slots, it retries in a loop with one-second intervals until either enough slots are available or the worker_startup_time timeout expires. Transient scheduler errors are returned as retryable errors.
compute_assignments (lines 67-93) is a free function that maps subtasks to workers using a slot-filling strategy. It iterates through each node in the logical program graph and assigns its subtasks sequentially to workers, advancing to the next worker when the current worker's slots are full. Each assignment records the node ID, subtask index, worker ID, and worker data address.
handle_worker_connect (lines 95-189) processes WorkerConnect messages from workers. It validates the run ID matches the current run, records the worker's metadata (ID, machine ID, data address, slot count), and spawns an async task to establish a gRPC channel to the worker with retry logic (up to 3 attempts with exponential backoff).
Supporting types include:
WorkerStatus-- tracks worker state through Connected, Initializing, Ready, and Failed phasesWorkerStateenum -- the four lifecycle states of a worker during schedulingslots_for_job-- computes required slots as the maximum parallelism across all nodes
Usage
The Scheduling state is entered from various predecessor states (Compiling, Recovering, Rescaling, Restarting). It is a transient state that always transitions to either Running (on success), Stopped (if the job is no longer desired to be running), or retries itself with backoff (on transient failures). Fatal errors such as insufficient scheduler resources cause a transition to a terminal error state.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File:
crates/arroyo-controller/src/states/scheduling.rs(lines 67-723)
Signature
#[derive(Debug)]
pub struct Scheduling {}
#[async_trait::async_trait]
impl State for Scheduling {
fn name(&self) -> &'static str {
"Scheduling"
}
async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}
impl Scheduling {
async fn start_workers<'a>(
self: Box<Self>,
ctx: &mut JobContext<'a>,
slots_needed: usize,
) -> Result<Box<Self>, StateError>
}
fn compute_assignments(
workers: Vec<&WorkerStatus>,
program: &LogicalProgram,
) -> Vec<TaskAssignment>
async fn handle_worker_connect<'a>(
msg: JobMessage,
workers: &mut HashMap<WorkerId, WorkerStatus>,
worker_connects: Arc<Mutex<HashMap<WorkerId, WorkerGrpcClient<Channel>>>>,
handles: &mut Vec<JoinHandle<()>>,
ctx: &mut JobContext<'a>,
) -> Result<(), StateError>
fn slots_for_job(job: &LogicalProgram) -> usize
Import
use arroyo_rpc::grpc::rpc::{
StartExecutionReq, TaskAssignment, worker_grpc_client::WorkerGrpcClient,
};
use arroyo_types::{MachineId, WorkerId};
use arroyo_datastream::logical::LogicalProgram;
use crate::states::{JobContext, State, Transition, StateError};
use crate::schedulers::StartPipelineReq;
use crate::job_controller::JobController;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ctx | &mut JobContext |
Yes | Job context providing access to the program, configuration, scheduler, database, message channel, and mutable status |
| ctx.program | LogicalProgram |
Yes | The logical dataflow graph with operator nodes and their parallelism levels |
| ctx.scheduler | Arc<dyn Scheduler> |
Yes | The scheduler backend used to start and stop worker processes |
| ctx.config | JobConfig |
Yes | Job configuration including ID, pipeline name, parallelism overrides, TTL, and ignore_state_before_epoch threshold |
| ctx.rx | Receiver<JobMessage> |
Yes | Channel receiving worker connect, task started, task failed, and config update messages |
| ctx.db | DatabaseSource |
Yes | Database connection for querying checkpoint history and marking failed checkpoints |
Outputs
| Name | Type | Description |
|---|---|---|
| (return) | Result<Transition, StateError> |
On success: Transition::next(Scheduling, Running {}). On retryable failure: StateError with retry count. On fatal failure: StateError (fatal).
|
| ctx.job_controller | Option<JobController> |
Set to Some(controller) with worker gRPC clients, checkpoint epoch, program, and metrics
|
| ctx.status.tasks | Option<i32> |
Set to the total task count of the program |
| ctx.metrics | RwLock<HashMap<...>> |
Updated with a new JobMetrics instance for this job
|
| Side effect | Worker processes | Worker processes are started via the scheduler backend |
| Side effect | gRPC calls | StartExecutionReq sent to all workers with program, assignments, and restore epoch
|