Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Local Run

From Leeroopedia


Template:Implementation

Summary

The Local Run implementation realizes the Local Cluster Initialization principle through the run(args) function in the Arroyo CLI crate. This function initializes a complete streaming cluster within a single process, combining the controller, API server, embedded scheduler, and SQLite-backed metadata storage to execute a SQL pipeline locally.

Code Reference

  • File: crates/arroyo/src/run.rs (L400-L507)
  • Entry point: pub async fn run(args: RunArgs)

Function Signature

pub async fn run(args: RunArgs)

Where RunArgs is defined in crates/arroyo/src/main.rs (L59-L80):

#[derive(Args)]
struct RunArgs {
    /// Name for this pipeline
    #[arg(short, long)]
    name: Option<String>,

    /// Directory or URL where checkpoints and metadata will be written and restored from
    #[arg(short = 's', long)]
    state_dir: Option<String>,

    /// Number of parallel subtasks to run
    #[arg(short, long, default_value = "1")]
    parallelism: u32,

    /// Force the pipeline to start even if the state file does not match the query
    #[clap(short, long)]
    force: bool,

    /// The query to run
    #[clap(value_parser, default_value = "-")]
    query: Input,
}

Initialization Sequence

The run function performs the following steps in order:

Step 1: Logging Initialization

let _guard = arroyo_server_common::init_logging_with_filter(
    "pipeline",
    if env::var("RUST_LOG").is_err() {
        // Default to WARN level, with INFO for arroyo::run
        EnvFilter::builder()
            .with_default_directive(LevelFilter::WARN.into())
            .from_env_lossy()
            .add_directive("arroyo::run=info".parse().unwrap())
    } else {
        EnvFilter::builder()
            .with_default_directive(LevelFilter::INFO.into())
            .from_env_lossy()
    },
);

Sets up tracing with a default WARN filter (with INFO for the run module), or uses the user-provided RUST_LOG environment variable.

Step 2: Query Resolution

let query = match config().run.query.clone() {
    Some(query) => query,
    None => std::io::read_to_string(args.query).unwrap(),
};

Reads the SQL query from either the configuration, a file path, or standard input.

Step 3: Shutdown Framework Initialization

let mut shutdown = Shutdown::new("pipeline", SignalBehavior::Handle);

Creates a Shutdown instance that handles SIGTERM/SIGINT signals for graceful termination.

Step 4: State Directory Resolution

let state_dir = args
    .state_dir
    .or_else(|| config().run.state_dir.clone())
    .unwrap_or_else(|| {
        format!(
            "{}/pipeline-{}",
            config().checkpoint_url,
            to_millis(SystemTime::now())
        )
    });

Resolves the state directory from (in priority order): CLI argument, configuration, or generates a new timestamped path.

Step 5: Database Preparation

let mut maybe_local_db = MaybeLocalDb::from_dir(&state_dir).await;

MaybeLocalDb::from_dir() creates a StorageProvider for the state directory. If the state directory is remote (e.g., S3), it downloads the existing SQLite database to a temporary local path. If local, it uses the path directly.

Step 6: Configuration Override

config::update(|c| {
    c.database.r#type = DatabaseType::Sqlite;
    c.database.sqlite.path = maybe_local_db.local_path.clone();
    c.checkpoint_url = state_dir.clone();
    if let Some(port) = c.api.run_http_port {
        c.api.http_port = port;
    } else {
        c.api.http_port = 0;
    }
    c.controller.rpc_port = 0;
    if c.controller.scheduler != Scheduler::Embedded {
        c.controller.scheduler = Scheduler::Process;
    }
    c.pipeline.default_sink = DefaultSink::Stdout;
});

Overrides the global configuration for local-mode execution: SQLite database, dynamic ports, process/embedded scheduler, and stdout as default sink.

Step 7: Database Initialization and Connection

let db = db_source().await;
maybe_local_db.init_connection();

Initializes the database source (runs migrations) and opens a read-only SQLite connection for backup operations.

Step 8: Controller and API Startup

let controller_port = arroyo_controller::ControllerServer::new(db.clone())
    .await
    .start(shutdown.guard("controller"))
    .await
    .expect("could not start system");

config::update(|c| c.controller.rpc_port = controller_port);

let http_port = arroyo_api::start_server(db.clone(), shutdown.guard("api"))
    .await
    .unwrap();

Starts the gRPC controller on a dynamic port, updates the configuration with the actual port, then starts the HTTP API server on another dynamic port. Both receive ShutdownGuard instances for coordinated shutdown.

Step 9: Client and Backup Scheduling

let client = Arc::new(Client::new_with_client(
    &format!("http://localhost:{http_port}/api"),
    client_builder.build().unwrap(),
));

schedule_db_backups(&shutdown, maybe_local_db);

Creates an OpenAPI client pointed at the local API server and starts a background task that backs up the SQLite database to remote storage every 60 seconds (if the state directory is remote).

Step 10: Pipeline Submission

let shutdown_handler = PipelineShutdownHandler {
    client: client.clone(),
    pipeline_id: Arc::new(Mutex::new(None)),
};

shutdown.set_handler(Box::new(shutdown_handler.clone()));

shutdown.spawn_temporary(async move {
    run_pipeline(
        client, args.name, query, args.parallelism,
        http_port, shutdown_handler, args.force,
    ).await
});

Shutdown::handle_shutdown(shutdown.wait_for_shutdown(Duration::from_secs(60)).await);

Registers the pipeline shutdown handler (for graceful stop on signal), spawns the pipeline submission as a temporary task, and then waits for the shutdown signal. The process exits via Shutdown::handle_shutdown.

Helper: MaybeLocalDb

The MaybeLocalDb struct (L232-L346) manages the SQLite database lifecycle for both local and remote state directories:

Method Purpose
from_dir(s: &str) Creates a StorageProvider; downloads remote SQLite to temp path if needed
init_connection() Opens a read-only SQLite connection for backup operations
backup() Creates a local backup file, uploads to remote storage, and cleans up

Helper: schedule_db_backups

The schedule_db_backups function (L348-L379) starts a Tokio task that:

  • Runs every 60 seconds via an interval timer
  • Listens for explicit backup notifications via init_db_notifier()
  • Calls MaybeLocalDb::backup() to upload the current SQLite state to remote storage
  • Respects the shutdown token to terminate gracefully

I/O

  • Input: RunArgs containing the SQL query (from file or stdin), optional pipeline name, parallelism count, state directory, and force flag.
  • Output: Runs the streaming pipeline until completion, failure, or termination signal. Exit code reflects the outcome.

Related

Environment:ArroyoSystems_Arroyo_Rust_Runtime

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment