Implementation:ArroyoSystems Arroyo Running State
| Knowledge Sources | |
|---|---|
| Domains | Stream_Processing, Distributed_Systems, Monitoring |
| Last Updated | 2026-02-08 12:00 GMT |
Overview
Implements the Running controller state and the JobController::progress method, which together form the steady-state control loop for a running streaming pipeline. The Running state multiplexes event handling across message channels, progress polling, and timers, while progress() performs the ordered sequence of health checks, checkpoint management, and metrics collection.
Description
The Running state implementation spans two files:
Running::next (running.rs, lines 30-168) implements the main control loop using tokio::select! to multiplex across four event sources:
- Job message channel (
ctx.rx.recv()) -- handles three message types:ConfigUpdate-- checks for stop requests (viastop_if_desired_running!macro), restart nonce changes (transitions toRestarting), and parallelism override changes (transitions toRescaling). Also forwards updated config to theJobController.RunningMessage-- delegated tojob_controller.handle_message()which processes heartbeats, task completions, checkpoint completions, and commit acknowledgments. Errors trigger a retryable state error.- Other messages -- handled by the generic
ctx.handle()method.
- Progress polling timer (200ms interval) -- calls
job_controller.progress()and dispatches on the result:ControllerProgress::Continue-- no action needed, continue loopingControllerProgress::Finishing-- all sources have completed, transition toFinishingstateControllerProgress::TaskFailed(event)-- logs the error event and delegates toctx.handle_task_error()for recovery or error transitionErr-- logs the error and transitions toRecoveringwith anInternalerror domain- Additionally, if the job has been running long enough (beyond
healthy_duration) and has a non-zero restart count, it resets the restart counter in the database to indicate the job is now healthy.
- Logging timer (60-second interval) -- emits a periodic
job_runninglog event with the job ID, scheduler type, and duration since last state transition. Used for operational monitoring.
- TTL timer -- if the job has a time-to-live configured, this timer fires when the TTL expires and transitions to
StoppingwithStopMode::Immediate.
JobController::progress (job_controller/mod.rs, lines 783-859) performs a priority-ordered sequence of health and management checks:
- Worker liveness: Calls
self.model.worker_timedout()to check if any worker has missed its heartbeat deadline. Returns an error ("worker failed") if so, which causes the Running state to transition to Recovering.
- Task failure detection: Calls
self.model.task_failed()to check if any task has reported a failure. ReturnsControllerProgress::TaskFailed(event)with the failure details.
- Source completion: Calls
self.model.any_finished_sources()to check if all source operators have finished producing data. ReturnsControllerProgress::Finishingif so.
- Cleanup task management: Checks if a background cleanup task has completed and processes its result (updating
min_epochon success, logging errors on failure). If cleanup is needed and no cleanup is running and no checkpoint is in progress, starts a new cleanup task.
- Checkpoint management: If a checkpoint is in progress, checks for completion via
finish_checkpoint_if_done(). If no checkpoint is active and the time since the last checkpoint exceeds the configured interval (and no cleanup is running), initiates a new checkpoint viaself.checkpoint(false).
- Metrics collection: If the time since last metrics update exceeds
COLLECTION_RATE, queries workers for Prometheus metrics and updates the job metrics aggregator.
- Returns
ControllerProgress::Continueif all checks pass without triggering a transition.
Usage
The Running state is entered from Scheduling upon successful pipeline startup. It runs continuously until an exit condition is met. The 200ms polling interval provides a balance between responsiveness and CPU overhead. Message-driven events (task failures, config changes) are handled immediately upon receipt, while periodic operations (checkpointing, metrics) are driven by the polling timer.
Code Reference
Source Location
- Repository: ArroyoSystems_Arroyo
- File:
crates/arroyo-controller/src/states/running.rs(lines 30-168) - File:
crates/arroyo-controller/src/job_controller/mod.rs(lines 783-859)
Signature
#[derive(Debug)]
pub struct Running {}
#[async_trait::async_trait]
impl State for Running {
fn name(&self) -> &'static str {
"Running"
}
async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}
pub enum ControllerProgress {
Continue,
Finishing,
TaskFailed(TaskFailedEvent),
}
impl JobController {
pub async fn progress(&mut self) -> anyhow::Result<ControllerProgress>
}
Import
use crate::states::finishing::Finishing;
use crate::states::recovering::Recovering;
use crate::states::rescaling::Rescaling;
use crate::states::restarting::Restarting;
use crate::states::stop_if_desired_running;
use crate::job_controller::ControllerProgress;
use crate::states::{JobContext, State, Transition, StateError};
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| ctx | &mut JobContext |
Yes | Job context with active JobController, job configuration, message receiver, and database handle
|
| ctx.job_controller | Option<JobController> |
Yes | Must be Some; the controller managing workers, checkpoints, heartbeats, and metrics
|
| ctx.rx | Receiver<JobMessage> |
Yes | Channel receiving ConfigUpdate, RunningMessage (heartbeats, task events, checkpoint events), and other messages
|
| ctx.config | JobConfig |
Yes | Job configuration including TTL, restart nonce, parallelism overrides, and stop mode |
| ctx.status | JobStatus |
Yes | Mutable job status tracking restart count and start time |
Outputs
| Name | Type | Description |
|---|---|---|
| (return) | Result<Transition, StateError> |
Transitions to one of: Finishing (sources done), Stopping (stop requested or TTL expired), Restarting (restart nonce changed), Rescaling (parallelism changed), Recovering (error or worker failure). Returns StateError on retryable job errors.
|
| ctx.status.restarts | i32 |
Reset to 0 in the database once the job has been running longer than healthy_duration
|
| Side effect | Checkpoints | Periodic checkpoints initiated via job_controller.checkpoint(false)
|
| Side effect | Metrics | Worker metrics collected and aggregated periodically |
| Side effect | Cleanup | Old checkpoint data compacted via background cleanup tasks |