Implementation:ArroyoSystems Arroyo Shutdown Handler
Summary
The Shutdown Handler implementation realizes the Graceful Shutdown principle through two primary components: the PipelineShutdownHandler struct (application-specific shutdown logic for the pipeline) and the Shutdown framework (general-purpose shutdown coordination with signal handling, guard counting, and timeouts). Together, they ensure the local cluster terminates cleanly with a final checkpoint and state backup.
Code Reference
- PipelineShutdownHandler:
crates/arroyo/src/run.rs(L78-L120) - Shutdown framework:
crates/arroyo-server-common/src/shutdown.rs(L133-L293) - ShutdownGuard:
crates/arroyo-server-common/src/shutdown.rs(L17-L131) - ShutdownHandler trait:
crates/arroyo-server-common/src/shutdown.rs(L147-L150)
PipelineShutdownHandler
Struct Definition
#[derive(Clone)]
struct PipelineShutdownHandler {
client: Arc<Client>,
pipeline_id: Arc<Mutex<Option<String>>>,
}
The handler holds a shared reference to the API client and a mutex-protected optional pipeline ID. The pipeline ID is None initially and is set by run_pipeline once the pipeline is created or resumed. This allows the handler to be registered before the pipeline ID is known.
ShutdownHandler Implementation
#[async_trait]
impl ShutdownHandler for PipelineShutdownHandler {
async fn shutdown(&self) {
let Some(pipeline_id) = (*self.pipeline_id.lock().unwrap()).clone() else {
return;
};
info!("Stopping pipeline with a final checkpoint...");
if let Err(e) = self
.client
.patch_pipeline()
.id(&pipeline_id)
.body(PipelinePatch::builder().stop(StopType::Checkpoint))
.send()
.await
{
warn!("Unable to stop pipeline with a final checkpoint: {}", e);
}
if (timeout(
Duration::from_secs(120),
wait_for_state(&self.client, &pipeline_id, &["Stopped", "Failed"]),
)
.await)
.is_err()
{
error!(
"Pipeline did not complete checkpoint within timeout; shutting down immediately"
);
}
// start a final backup and wait for it to finish
if let Some(c) = notify_db() {
let _ = c.await;
}
}
}
The shutdown method executes three phases:
- Checkpoint Request: Sends a
PipelinePatchwithStopType::Checkpointto the API. If the pipeline ID is not yet set (pipeline was not started), the handler returns immediately. - State Wait: Waits up to 120 seconds for the pipeline to reach "Stopped" or "Failed" state using
wait_for_state. If the timeout expires, an error is logged and shutdown continues without a clean checkpoint. - Database Backup: Triggers a final SQLite backup via
notify_db()and awaits its completion. This ensures the latest checkpoint metadata is persisted to remote storage.
Shutdown Framework
Core Structures
pub struct Shutdown {
name: &'static str,
guard: ShutdownGuard,
rx: broadcast::Receiver<()>,
signal_rx: Option<mpsc::Receiver<()>>,
handler: Option<Box<dyn ShutdownHandler + Send>>,
}
pub struct ShutdownGuard {
name: &'static str,
tx: broadcast::Sender<()>,
token: CancellationToken,
ref_count: Arc<AtomicUsize>,
temporary: bool,
}
ShutdownHandler Trait
#[async_trait]
pub trait ShutdownHandler {
async fn shutdown(&self);
}
The trait defines a single async method that performs application-specific cleanup when a shutdown signal is received.
SignalBehavior Enum
pub enum SignalBehavior {
Handle, // Listen for and handle signals
Ignore, // Listen for but ignore signals
None, // Do not listen for signals
}
Construction
impl Shutdown {
pub fn new(name: &'static str, signal_behavior: SignalBehavior) -> Self {
let (tx, rx) = broadcast::channel(1);
let token = CancellationToken::new();
// Spawn signal listening task based on signal_behavior
let signal_rx = if !matches!(signal_behavior, SignalBehavior::None) {
// ... spawns task that listens for SIGINT and SIGTERM
Some(signal_rx)
} else {
None
};
Self {
name,
guard: ShutdownGuard::new("root", tx, token, Arc::new(AtomicUsize::new(0)), false),
rx,
signal_rx,
handler: None,
}
}
}
Creates a new Shutdown instance with a root ShutdownGuard. When SignalBehavior::Handle is specified, a background task is spawned that listens for both SIGINT (Ctrl+C) and SIGTERM signals and forwards them to the shutdown coordinator.
Key Methods
| Method | Description |
|---|---|
set_handler(handler) |
Registers a ShutdownHandler to be called when a signal is received
|
guard(name) |
Creates a child ShutdownGuard for a named task, incrementing the reference count
|
spawn_task(name, task) |
Spawns a long-lived task with an associated guard |
spawn_temporary(task) |
Spawns a temporary task whose guard does not block shutdown |
token() |
Returns the CancellationToken for external cancellation checks
|
wait_for_shutdown(timeout) |
Main shutdown coordination loop (see below) |
handle_shutdown(result) |
Maps the shutdown result to an exit code and calls exit()
|
wait_for_shutdown
pub async fn wait_for_shutdown(mut self, timeout: Duration) -> Result<(), ShutdownError> {
select! {
Some(_) = OptionFuture::from(self.signal_rx.as_mut().map(|s| s.recv())) => {
// First signal received
info!("Received signal, shutting down {}", self.name);
if let Some(handler) = self.handler {
tokio::spawn(async move {
info!("Running shutdown handler");
handler.shutdown().await;
info!("Finished shutdown handler");
self.guard.token.cancel();
drop(self.guard);
});
} else {
self.guard.token.cancel();
drop(self.guard)
}
}
_ = self.guard.token.cancelled() => {
// Internal cancellation (e.g., a task failed)
info!("{} shutting down", self.name);
drop(self.guard)
}
}
select! {
_ = self.rx.recv() => {
// All guards dropped -- shutdown complete
info!("{} shutdown complete", self.name);
let code = ERROR_CODE.load(Ordering::Relaxed);
if code == 0 { Ok(()) } else { Err(ShutdownError::Err(code)) }
}
Some(_) = OptionFuture::from(self.signal_rx.as_mut().map(|s| s.recv())) => {
// Second signal -- force immediate exit
info!("Received signal, shutting down {} immediately", self.name);
Err(ShutdownError::Signal)
}
_ = tokio::time::sleep(timeout) => {
// Timeout expired
warn!("{} failed to shutdown after {} seconds", self.name, timeout.as_secs());
Err(ShutdownError::Timeout)
}
}
}
This method implements a two-phase shutdown protocol:
Phase 1: Trigger
- Waits for either a signal or internal cancellation.
- On signal: runs the shutdown handler in a spawned task, then cancels the token and drops the root guard.
- On internal cancellation: drops the root guard immediately.
Phase 2: Wait for Completion
- Waits for one of three conditions:
- All guards dropped (broadcast received) -- clean shutdown.
- Second signal received -- forced exit with code 128.
- Timeout expires -- exit with code 129.
ShutdownGuard Details
Reference Counting
impl Drop for ShutdownGuard {
fn drop(&mut self) {
if !self.temporary {
self.token.cancel();
}
let count = self.ref_count.fetch_sub(1, Ordering::SeqCst);
debug!("[{}] Dropping guard (count={})", self.name, count);
if count == 1 {
let _ = self.tx.send(());
}
}
}
When a guard is dropped:
- Non-temporary guards cancel the token (signaling other tasks to stop).
- The reference count is decremented.
- When the count reaches zero (last guard dropped), a broadcast is sent to signal shutdown completion.
Task Spawning
pub fn into_spawn_task<F, T>(self, task: T) -> JoinHandle<Option<T::Output>>
where
F: Send + 'static,
T: Future<Output = anyhow::Result<F>> + Send + 'static,
{
let token = self.token.clone();
tokio::spawn(async move {
let output = select! {
output = task => {
if let Err(e) = &output {
error!("{}", e);
ERROR_CODE.store(1, Ordering::Relaxed);
token.cancel();
}
Some(output)
}
_ = token.cancelled() => {
None
}
};
drop(self);
output
})
}
Tasks spawned with guards are automatically:
- Cancelled when the cancellation token fires.
- Error-propagating: if a task fails, the error code is set and all other tasks are cancelled.
- Guard-dropping: the guard is dropped when the task completes, contributing to the reference count.
Integration in Local Cluster
The shutdown framework is wired into the local cluster as follows:
// In run():
let mut shutdown = Shutdown::new("pipeline", SignalBehavior::Handle);
// Controller and API receive shutdown guards
let controller_port = arroyo_controller::ControllerServer::new(db.clone())
.await
.start(shutdown.guard("controller"))
.await
.expect("could not start system");
let http_port = arroyo_api::start_server(db.clone(), shutdown.guard("api"))
.await
.unwrap();
// Register the pipeline-specific handler
shutdown.set_handler(Box::new(shutdown_handler.clone()));
// Pipeline submission runs as a temporary task
shutdown.spawn_temporary(async move {
run_pipeline(client, args.name, query, args.parallelism,
http_port, shutdown_handler, args.force).await
});
// Wait for shutdown signal and handle exit
Shutdown::handle_shutdown(shutdown.wait_for_shutdown(Duration::from_secs(60)).await);
I/O
- Input: SIGTERM or SIGINT (Ctrl+C) signal from the operating system.
- Output: Pipeline is stopped with a final checkpoint, SQLite database is backed up to remote storage (if applicable), and the process exits with an appropriate exit code.