Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Running State

From Leeroopedia


Knowledge Sources
Domains Stream_Processing, Distributed_Systems, Monitoring
Last Updated 2026-02-08 12:00 GMT

Overview

Implements the Running controller state and the JobController::progress method, which together form the steady-state control loop for a running streaming pipeline. The Running state multiplexes event handling across message channels, progress polling, and timers, while progress() performs the ordered sequence of health checks, checkpoint management, and metrics collection.

Description

The Running state implementation spans two files:

Running::next (running.rs, lines 30-168) implements the main control loop using tokio::select! to multiplex across four event sources:

  1. Job message channel (ctx.rx.recv()) -- handles three message types:
    • ConfigUpdate -- checks for stop requests (via stop_if_desired_running! macro), restart nonce changes (transitions to Restarting), and parallelism override changes (transitions to Rescaling). Also forwards updated config to the JobController.
    • RunningMessage -- delegated to job_controller.handle_message() which processes heartbeats, task completions, checkpoint completions, and commit acknowledgments. Errors trigger a retryable state error.
    • Other messages -- handled by the generic ctx.handle() method.
  1. Progress polling timer (200ms interval) -- calls job_controller.progress() and dispatches on the result:
    • ControllerProgress::Continue -- no action needed, continue looping
    • ControllerProgress::Finishing -- all sources have completed, transition to Finishing state
    • ControllerProgress::TaskFailed(event) -- logs the error event and delegates to ctx.handle_task_error() for recovery or error transition
    • Err -- logs the error and transitions to Recovering with an Internal error domain
    • Additionally, if the job has been running long enough (beyond healthy_duration) and has a non-zero restart count, it resets the restart counter in the database to indicate the job is now healthy.
  1. Logging timer (60-second interval) -- emits a periodic job_running log event with the job ID, scheduler type, and duration since last state transition. Used for operational monitoring.
  1. TTL timer -- if the job has a time-to-live configured, this timer fires when the TTL expires and transitions to Stopping with StopMode::Immediate.

JobController::progress (job_controller/mod.rs, lines 783-859) performs a priority-ordered sequence of health and management checks:

  1. Worker liveness: Calls self.model.worker_timedout() to check if any worker has missed its heartbeat deadline. Returns an error ("worker failed") if so, which causes the Running state to transition to Recovering.
  1. Task failure detection: Calls self.model.task_failed() to check if any task has reported a failure. Returns ControllerProgress::TaskFailed(event) with the failure details.
  1. Source completion: Calls self.model.any_finished_sources() to check if all source operators have finished producing data. Returns ControllerProgress::Finishing if so.
  1. Cleanup task management: Checks if a background cleanup task has completed and processes its result (updating min_epoch on success, logging errors on failure). If cleanup is needed and no cleanup is running and no checkpoint is in progress, starts a new cleanup task.
  1. Checkpoint management: If a checkpoint is in progress, checks for completion via finish_checkpoint_if_done(). If no checkpoint is active and the time since the last checkpoint exceeds the configured interval (and no cleanup is running), initiates a new checkpoint via self.checkpoint(false).
  1. Metrics collection: If the time since last metrics update exceeds COLLECTION_RATE, queries workers for Prometheus metrics and updates the job metrics aggregator.
  1. Returns ControllerProgress::Continue if all checks pass without triggering a transition.

Usage

The Running state is entered from Scheduling upon successful pipeline startup. It runs continuously until an exit condition is met. The 200ms polling interval provides a balance between responsiveness and CPU overhead. Message-driven events (task failures, config changes) are handled immediately upon receipt, while periodic operations (checkpointing, metrics) are driven by the polling timer.

Code Reference

Source Location

  • Repository: ArroyoSystems_Arroyo
  • File: crates/arroyo-controller/src/states/running.rs (lines 30-168)
  • File: crates/arroyo-controller/src/job_controller/mod.rs (lines 783-859)

Signature

#[derive(Debug)]
pub struct Running {}

#[async_trait::async_trait]
impl State for Running {
    fn name(&self) -> &'static str {
        "Running"
    }

    async fn next(mut self: Box<Self>, ctx: &mut JobContext) -> Result<Transition, StateError>
}

pub enum ControllerProgress {
    Continue,
    Finishing,
    TaskFailed(TaskFailedEvent),
}

impl JobController {
    pub async fn progress(&mut self) -> anyhow::Result<ControllerProgress>
}

Import

use crate::states::finishing::Finishing;
use crate::states::recovering::Recovering;
use crate::states::rescaling::Rescaling;
use crate::states::restarting::Restarting;
use crate::states::stop_if_desired_running;
use crate::job_controller::ControllerProgress;
use crate::states::{JobContext, State, Transition, StateError};

I/O Contract

Inputs

Name Type Required Description
ctx &mut JobContext Yes Job context with active JobController, job configuration, message receiver, and database handle
ctx.job_controller Option<JobController> Yes Must be Some; the controller managing workers, checkpoints, heartbeats, and metrics
ctx.rx Receiver<JobMessage> Yes Channel receiving ConfigUpdate, RunningMessage (heartbeats, task events, checkpoint events), and other messages
ctx.config JobConfig Yes Job configuration including TTL, restart nonce, parallelism overrides, and stop mode
ctx.status JobStatus Yes Mutable job status tracking restart count and start time

Outputs

Name Type Description
(return) Result<Transition, StateError> Transitions to one of: Finishing (sources done), Stopping (stop requested or TTL expired), Restarting (restart nonce changed), Rescaling (parallelism changed), Recovering (error or worker failure). Returns StateError on retryable job errors.
ctx.status.restarts i32 Reset to 0 in the database once the job has been running longer than healthy_duration
Side effect Checkpoints Periodic checkpoints initiated via job_controller.checkpoint(false)
Side effect Metrics Worker metrics collected and aggregated periodically
Side effect Cleanup Old checkpoint data compacted via background cleanup tasks

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment