Implementation:ArroyoSystems Arroyo Local Run
Summary
The Local Run implementation realizes the Local Cluster Initialization principle through the run(args) function in the Arroyo CLI crate. This function initializes a complete streaming cluster within a single process, combining the controller, API server, embedded scheduler, and SQLite-backed metadata storage to execute a SQL pipeline locally.
Code Reference
- File:
crates/arroyo/src/run.rs(L400-L507) - Entry point:
pub async fn run(args: RunArgs)
Function Signature
pub async fn run(args: RunArgs)
Where RunArgs is defined in crates/arroyo/src/main.rs (L59-L80):
#[derive(Args)]
struct RunArgs {
/// Name for this pipeline
#[arg(short, long)]
name: Option<String>,
/// Directory or URL where checkpoints and metadata will be written and restored from
#[arg(short = 's', long)]
state_dir: Option<String>,
/// Number of parallel subtasks to run
#[arg(short, long, default_value = "1")]
parallelism: u32,
/// Force the pipeline to start even if the state file does not match the query
#[clap(short, long)]
force: bool,
/// The query to run
#[clap(value_parser, default_value = "-")]
query: Input,
}
Initialization Sequence
The run function performs the following steps in order:
Step 1: Logging Initialization
let _guard = arroyo_server_common::init_logging_with_filter(
"pipeline",
if env::var("RUST_LOG").is_err() {
// Default to WARN level, with INFO for arroyo::run
EnvFilter::builder()
.with_default_directive(LevelFilter::WARN.into())
.from_env_lossy()
.add_directive("arroyo::run=info".parse().unwrap())
} else {
EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy()
},
);
Sets up tracing with a default WARN filter (with INFO for the run module), or uses the user-provided RUST_LOG environment variable.
Step 2: Query Resolution
let query = match config().run.query.clone() {
Some(query) => query,
None => std::io::read_to_string(args.query).unwrap(),
};
Reads the SQL query from either the configuration, a file path, or standard input.
Step 3: Shutdown Framework Initialization
let mut shutdown = Shutdown::new("pipeline", SignalBehavior::Handle);
Creates a Shutdown instance that handles SIGTERM/SIGINT signals for graceful termination.
Step 4: State Directory Resolution
let state_dir = args
.state_dir
.or_else(|| config().run.state_dir.clone())
.unwrap_or_else(|| {
format!(
"{}/pipeline-{}",
config().checkpoint_url,
to_millis(SystemTime::now())
)
});
Resolves the state directory from (in priority order): CLI argument, configuration, or generates a new timestamped path.
Step 5: Database Preparation
let mut maybe_local_db = MaybeLocalDb::from_dir(&state_dir).await;
MaybeLocalDb::from_dir() creates a StorageProvider for the state directory. If the state directory is remote (e.g., S3), it downloads the existing SQLite database to a temporary local path. If local, it uses the path directly.
Step 6: Configuration Override
config::update(|c| {
c.database.r#type = DatabaseType::Sqlite;
c.database.sqlite.path = maybe_local_db.local_path.clone();
c.checkpoint_url = state_dir.clone();
if let Some(port) = c.api.run_http_port {
c.api.http_port = port;
} else {
c.api.http_port = 0;
}
c.controller.rpc_port = 0;
if c.controller.scheduler != Scheduler::Embedded {
c.controller.scheduler = Scheduler::Process;
}
c.pipeline.default_sink = DefaultSink::Stdout;
});
Overrides the global configuration for local-mode execution: SQLite database, dynamic ports, process/embedded scheduler, and stdout as default sink.
Step 7: Database Initialization and Connection
let db = db_source().await;
maybe_local_db.init_connection();
Initializes the database source (runs migrations) and opens a read-only SQLite connection for backup operations.
Step 8: Controller and API Startup
let controller_port = arroyo_controller::ControllerServer::new(db.clone())
.await
.start(shutdown.guard("controller"))
.await
.expect("could not start system");
config::update(|c| c.controller.rpc_port = controller_port);
let http_port = arroyo_api::start_server(db.clone(), shutdown.guard("api"))
.await
.unwrap();
Starts the gRPC controller on a dynamic port, updates the configuration with the actual port, then starts the HTTP API server on another dynamic port. Both receive ShutdownGuard instances for coordinated shutdown.
Step 9: Client and Backup Scheduling
let client = Arc::new(Client::new_with_client(
&format!("http://localhost:{http_port}/api"),
client_builder.build().unwrap(),
));
schedule_db_backups(&shutdown, maybe_local_db);
Creates an OpenAPI client pointed at the local API server and starts a background task that backs up the SQLite database to remote storage every 60 seconds (if the state directory is remote).
Step 10: Pipeline Submission
let shutdown_handler = PipelineShutdownHandler {
client: client.clone(),
pipeline_id: Arc::new(Mutex::new(None)),
};
shutdown.set_handler(Box::new(shutdown_handler.clone()));
shutdown.spawn_temporary(async move {
run_pipeline(
client, args.name, query, args.parallelism,
http_port, shutdown_handler, args.force,
).await
});
Shutdown::handle_shutdown(shutdown.wait_for_shutdown(Duration::from_secs(60)).await);
Registers the pipeline shutdown handler (for graceful stop on signal), spawns the pipeline submission as a temporary task, and then waits for the shutdown signal. The process exits via Shutdown::handle_shutdown.
Helper: MaybeLocalDb
The MaybeLocalDb struct (L232-L346) manages the SQLite database lifecycle for both local and remote state directories:
| Method | Purpose |
|---|---|
from_dir(s: &str) |
Creates a StorageProvider; downloads remote SQLite to temp path if needed
|
init_connection() |
Opens a read-only SQLite connection for backup operations |
backup() |
Creates a local backup file, uploads to remote storage, and cleans up |
Helper: schedule_db_backups
The schedule_db_backups function (L348-L379) starts a Tokio task that:
- Runs every 60 seconds via an interval timer
- Listens for explicit backup notifications via
init_db_notifier() - Calls
MaybeLocalDb::backup()to upload the current SQLite state to remote storage - Respects the shutdown token to terminate gracefully
I/O
- Input:
RunArgscontaining the SQL query (from file or stdin), optional pipeline name, parallelism count, state directory, and force flag. - Output: Runs the streaming pipeline until completion, failure, or termination signal. Exit code reflects the outcome.