Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Shutdown Handler

From Leeroopedia
Revision as of 14:28, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/ArroyoSystems_Arroyo_Shutdown_Handler.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Template:Implementation

Summary

The Shutdown Handler implementation realizes the Graceful Shutdown principle through two primary components: the PipelineShutdownHandler struct (application-specific shutdown logic for the pipeline) and the Shutdown framework (general-purpose shutdown coordination with signal handling, guard counting, and timeouts). Together, they ensure the local cluster terminates cleanly with a final checkpoint and state backup.

Code Reference

  • PipelineShutdownHandler: crates/arroyo/src/run.rs (L78-L120)
  • Shutdown framework: crates/arroyo-server-common/src/shutdown.rs (L133-L293)
  • ShutdownGuard: crates/arroyo-server-common/src/shutdown.rs (L17-L131)
  • ShutdownHandler trait: crates/arroyo-server-common/src/shutdown.rs (L147-L150)

PipelineShutdownHandler

Struct Definition

#[derive(Clone)]
struct PipelineShutdownHandler {
    client: Arc<Client>,
    pipeline_id: Arc<Mutex<Option<String>>>,
}

The handler holds a shared reference to the API client and a mutex-protected optional pipeline ID. The pipeline ID is None initially and is set by run_pipeline once the pipeline is created or resumed. This allows the handler to be registered before the pipeline ID is known.

ShutdownHandler Implementation

#[async_trait]
impl ShutdownHandler for PipelineShutdownHandler {
    async fn shutdown(&self) {
        let Some(pipeline_id) = (*self.pipeline_id.lock().unwrap()).clone() else {
            return;
        };

        info!("Stopping pipeline with a final checkpoint...");
        if let Err(e) = self
            .client
            .patch_pipeline()
            .id(&pipeline_id)
            .body(PipelinePatch::builder().stop(StopType::Checkpoint))
            .send()
            .await
        {
            warn!("Unable to stop pipeline with a final checkpoint: {}", e);
        }

        if (timeout(
            Duration::from_secs(120),
            wait_for_state(&self.client, &pipeline_id, &["Stopped", "Failed"]),
        )
        .await)
            .is_err()
        {
            error!(
                "Pipeline did not complete checkpoint within timeout; shutting down immediately"
            );
        }

        // start a final backup and wait for it to finish
        if let Some(c) = notify_db() {
            let _ = c.await;
        }
    }
}

The shutdown method executes three phases:

  1. Checkpoint Request: Sends a PipelinePatch with StopType::Checkpoint to the API. If the pipeline ID is not yet set (pipeline was not started), the handler returns immediately.
  2. State Wait: Waits up to 120 seconds for the pipeline to reach "Stopped" or "Failed" state using wait_for_state. If the timeout expires, an error is logged and shutdown continues without a clean checkpoint.
  3. Database Backup: Triggers a final SQLite backup via notify_db() and awaits its completion. This ensures the latest checkpoint metadata is persisted to remote storage.

Shutdown Framework

Core Structures

pub struct Shutdown {
    name: &'static str,
    guard: ShutdownGuard,
    rx: broadcast::Receiver<()>,
    signal_rx: Option<mpsc::Receiver<()>>,
    handler: Option<Box<dyn ShutdownHandler + Send>>,
}

pub struct ShutdownGuard {
    name: &'static str,
    tx: broadcast::Sender<()>,
    token: CancellationToken,
    ref_count: Arc<AtomicUsize>,
    temporary: bool,
}

ShutdownHandler Trait

#[async_trait]
pub trait ShutdownHandler {
    async fn shutdown(&self);
}

The trait defines a single async method that performs application-specific cleanup when a shutdown signal is received.

SignalBehavior Enum

pub enum SignalBehavior {
    Handle,   // Listen for and handle signals
    Ignore,   // Listen for but ignore signals
    None,     // Do not listen for signals
}

Construction

impl Shutdown {
    pub fn new(name: &'static str, signal_behavior: SignalBehavior) -> Self {
        let (tx, rx) = broadcast::channel(1);
        let token = CancellationToken::new();

        // Spawn signal listening task based on signal_behavior
        let signal_rx = if !matches!(signal_behavior, SignalBehavior::None) {
            // ... spawns task that listens for SIGINT and SIGTERM
            Some(signal_rx)
        } else {
            None
        };

        Self {
            name,
            guard: ShutdownGuard::new("root", tx, token, Arc::new(AtomicUsize::new(0)), false),
            rx,
            signal_rx,
            handler: None,
        }
    }
}

Creates a new Shutdown instance with a root ShutdownGuard. When SignalBehavior::Handle is specified, a background task is spawned that listens for both SIGINT (Ctrl+C) and SIGTERM signals and forwards them to the shutdown coordinator.

Key Methods

Method Description
set_handler(handler) Registers a ShutdownHandler to be called when a signal is received
guard(name) Creates a child ShutdownGuard for a named task, incrementing the reference count
spawn_task(name, task) Spawns a long-lived task with an associated guard
spawn_temporary(task) Spawns a temporary task whose guard does not block shutdown
token() Returns the CancellationToken for external cancellation checks
wait_for_shutdown(timeout) Main shutdown coordination loop (see below)
handle_shutdown(result) Maps the shutdown result to an exit code and calls exit()

wait_for_shutdown

pub async fn wait_for_shutdown(mut self, timeout: Duration) -> Result<(), ShutdownError> {
    select! {
        Some(_) = OptionFuture::from(self.signal_rx.as_mut().map(|s| s.recv())) => {
            // First signal received
            info!("Received signal, shutting down {}", self.name);
            if let Some(handler) = self.handler {
                tokio::spawn(async move {
                    info!("Running shutdown handler");
                    handler.shutdown().await;
                    info!("Finished shutdown handler");
                    self.guard.token.cancel();
                    drop(self.guard);
                });
            } else {
                self.guard.token.cancel();
                drop(self.guard)
            }
        }
        _ = self.guard.token.cancelled() => {
            // Internal cancellation (e.g., a task failed)
            info!("{} shutting down", self.name);
            drop(self.guard)
        }
    }

    select! {
        _ = self.rx.recv() => {
            // All guards dropped -- shutdown complete
            info!("{} shutdown complete", self.name);
            let code = ERROR_CODE.load(Ordering::Relaxed);
            if code == 0 { Ok(()) } else { Err(ShutdownError::Err(code)) }
        }
        Some(_) = OptionFuture::from(self.signal_rx.as_mut().map(|s| s.recv())) => {
            // Second signal -- force immediate exit
            info!("Received signal, shutting down {} immediately", self.name);
            Err(ShutdownError::Signal)
        }
        _ = tokio::time::sleep(timeout) => {
            // Timeout expired
            warn!("{} failed to shutdown after {} seconds", self.name, timeout.as_secs());
            Err(ShutdownError::Timeout)
        }
    }
}

This method implements a two-phase shutdown protocol:

Phase 1: Trigger

  • Waits for either a signal or internal cancellation.
  • On signal: runs the shutdown handler in a spawned task, then cancels the token and drops the root guard.
  • On internal cancellation: drops the root guard immediately.

Phase 2: Wait for Completion

  • Waits for one of three conditions:
    • All guards dropped (broadcast received) -- clean shutdown.
    • Second signal received -- forced exit with code 128.
    • Timeout expires -- exit with code 129.

ShutdownGuard Details

Reference Counting

impl Drop for ShutdownGuard {
    fn drop(&mut self) {
        if !self.temporary {
            self.token.cancel();
        }
        let count = self.ref_count.fetch_sub(1, Ordering::SeqCst);
        debug!("[{}] Dropping guard (count={})", self.name, count);
        if count == 1 {
            let _ = self.tx.send(());
        }
    }
}

When a guard is dropped:

  • Non-temporary guards cancel the token (signaling other tasks to stop).
  • The reference count is decremented.
  • When the count reaches zero (last guard dropped), a broadcast is sent to signal shutdown completion.

Task Spawning

pub fn into_spawn_task<F, T>(self, task: T) -> JoinHandle<Option<T::Output>>
where
    F: Send + 'static,
    T: Future<Output = anyhow::Result<F>> + Send + 'static,
{
    let token = self.token.clone();
    tokio::spawn(async move {
        let output = select! {
            output = task => {
                if let Err(e) = &output {
                    error!("{}", e);
                    ERROR_CODE.store(1, Ordering::Relaxed);
                    token.cancel();
                }
                Some(output)
            }
            _ = token.cancelled() => {
                None
            }
        };
        drop(self);
        output
    })
}

Tasks spawned with guards are automatically:

  • Cancelled when the cancellation token fires.
  • Error-propagating: if a task fails, the error code is set and all other tasks are cancelled.
  • Guard-dropping: the guard is dropped when the task completes, contributing to the reference count.

Integration in Local Cluster

The shutdown framework is wired into the local cluster as follows:

// In run():
let mut shutdown = Shutdown::new("pipeline", SignalBehavior::Handle);

// Controller and API receive shutdown guards
let controller_port = arroyo_controller::ControllerServer::new(db.clone())
    .await
    .start(shutdown.guard("controller"))
    .await
    .expect("could not start system");

let http_port = arroyo_api::start_server(db.clone(), shutdown.guard("api"))
    .await
    .unwrap();

// Register the pipeline-specific handler
shutdown.set_handler(Box::new(shutdown_handler.clone()));

// Pipeline submission runs as a temporary task
shutdown.spawn_temporary(async move {
    run_pipeline(client, args.name, query, args.parallelism,
        http_port, shutdown_handler, args.force).await
});

// Wait for shutdown signal and handle exit
Shutdown::handle_shutdown(shutdown.wait_for_shutdown(Duration::from_secs(60)).await);

I/O

  • Input: SIGTERM or SIGINT (Ctrl+C) signal from the operating system.
  • Output: Pipeline is stopped with a final checkpoint, SQLite database is backed up to remote storage (if applicable), and the process exits with an appropriate exit code.

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment