Implementation:ArroyoSystems Arroyo Run Pipeline
Summary
The Run Pipeline implementation realizes the Pipeline Submission principle through the run_pipeline function. This function orchestrates the complete pipeline submission flow: waiting for the API server to become available, validating the SQL query, checking for existing pipelines to support resume, creating or resuming the pipeline, and waiting for it to reach the Running state.
Code Reference
- File:
crates/arroyo/src/run.rs(L147-L230) - Entry point:
async fn run_pipeline(...)
Function Signature
async fn run_pipeline(
client: Arc<Client>,
name: Option<String>,
query: String,
parallelism: u32,
http_port: u16,
shutdown_handler: PipelineShutdownHandler,
force: bool,
) -> anyhow::Result<()>
Parameters
| Parameter | Type | Description |
|---|---|---|
client |
Arc<Client> |
OpenAPI client connected to the local API server |
name |
Option<String> |
Optional pipeline name; defaults to "query" if not provided |
query |
String |
The SQL query defining the streaming pipeline |
parallelism |
u32 |
Number of parallel subtasks to run |
http_port |
u16 |
The API server's HTTP port (used to display the dashboard URL) |
shutdown_handler |
PipelineShutdownHandler |
Handler to register the pipeline ID for graceful shutdown |
force |
bool |
If true, proceed even when state directory belongs to a different pipeline |
Implementation Flow
Step 1: Wait for API Connectivity
wait_for_connect(&client).await.unwrap();
Calls wait_for_connect which polls the /ping endpoint up to 50 times at 10ms intervals, ensuring the API server is fully started before proceeding.
Step 2: Validate the Query
let errors = client
.validate_query()
.body(ValidateQueryPost::builder().query(&query))
.send()
.await?
.into_inner();
if !errors.errors.is_empty() {
eprintln!("There were some issues with the provided query");
for error in errors.errors {
eprintln!(" * {error}");
}
exit(1);
}
Sends the SQL query to the validation endpoint. If any validation errors are returned, they are printed to stderr and the process exits with code 1. This provides fast feedback before any resources are allocated.
Step 3: Check for Existing Pipeline
let id = match get_pipelines(&client)
.await?
.into_iter()
.find(|p| p.query == query)
{
Some(p) => { /* resume path */ }
None => { /* create path */ }
};
Retrieves all existing pipelines using the paginated get_pipelines helper and searches for one whose query matches the submitted SQL exactly (string equality).
Step 4a: Resume Existing Pipeline
Some(p) => {
info!("Pipeline already exists in database as {}", p.id);
client
.patch_pipeline()
.id(&p.id)
.body(PipelinePatch::builder().stop(StopType::None))
.send()
.await?;
p.id
}
When a matching pipeline is found, it is resumed by sending a PipelinePatch with StopType::None. This effectively tells the controller to schedule and run the pipeline from its last checkpoint.
Step 4b: Create New Pipeline
None => {
if !client.get_pipelines().send().await?.data.is_empty() {
let msg = "The specified state is for a different pipeline...";
if force {
warn!("{}", msg);
} else {
error!("{}", msg);
bail!("Exiting... if you would like to continue pass --force");
}
}
client
.create_pipeline()
.body(
PipelinePost::builder()
.name(name.unwrap_or_else(|| "query".to_string()))
.parallelism(parallelism)
.query(&query),
)
.send()
.await?
.into_inner()
.id
}
When no matching pipeline exists, the function first checks whether any pipelines exist in the database. If they do, the state directory likely belongs to a different query, and the function either errors or warns depending on the force flag. A new pipeline is created via create_pipeline with the specified name (defaulting to "query"), parallelism, and SQL query.
Step 5: Register Pipeline with Shutdown Handler
{
*shutdown_handler.pipeline_id.lock().unwrap() = Some(id.clone());
}
Stores the pipeline ID in the shared Mutex so the shutdown handler knows which pipeline to stop gracefully on signal.
Step 6: Wait for Running State
wait_for_state(&client, &id, &["Running"]).await?;
info!("Pipeline running... dashboard at http://localhost:{http_port}/pipelines/{id}");
Polls the pipeline's job state until it transitions to "Running". Once running, logs the dashboard URL for the user.
Helper: get_pipelines
The get_pipelines function (L122-L145) handles paginated retrieval of all pipelines:
async fn get_pipelines(client: &Client) -> anyhow::Result<Vec<Pipeline>> {
let mut starting_after = "".to_string();
let mut result = vec![];
loop {
let pipelines = client
.get_pipelines()
.starting_after(&starting_after)
.send()
.await?
.into_inner();
if let Some(next) = pipelines.data.last().map(|p| p.id.to_string()) {
starting_after = next;
}
result.extend(pipelines.data.into_iter());
if !pipelines.has_more {
break;
}
}
Ok(result)
}
Uses cursor-based pagination (starting_after) to retrieve all pipelines, handling the case where there are more results than a single page can return.
Helper: wait_for_connect
async fn wait_for_connect(client: &Client) -> anyhow::Result<()> {
for _ in 0..50 {
if client.ping().send().await.is_ok() {
return Ok(());
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
bail!("API server did not start up successfully; see logs for more details");
}
Polls the /ping endpoint up to 50 times at 10ms intervals (total ~500ms maximum wait) to handle the startup race condition.
I/O
- Input: API client, SQL query string, optional pipeline name, parallelism, HTTP port, shutdown handler reference, and force flag.
- Output: Returns
Ok(())when the pipeline transitions to the "Running" state. Errors if validation fails, the state directory mismatches (without force), the pipeline transitions to "Failed", or API communication fails.