Implementation:ArroyoSystems Arroyo Create Pipeline Int
| Field | Value |
|---|---|
| Sources | ArroyoSystems/arroyo |
| Domains | Stream_Processing, Pipeline_Management |
| Last Updated | 2026-02-08 |
Overview
create_pipeline_int is the internal function responsible for creating a new streaming pipeline in the Arroyo engine. It orchestrates the full creation workflow: compiling the SQL query, encoding the resulting program, persisting the pipeline record with associated metadata, and initializing a job record in the Created state ready for scheduling.
Description
The create_pipeline_int function is the core pipeline creation workhorse called by the public API endpoint handlers. It encapsulates the entire pipeline creation transaction, ensuring that compilation, persistence, and job initialization are performed atomically.
The function proceeds through the following steps:
- SQL compilation: Invokes
compile_sqlto parse, plan, optimize, and generate aLogicalProgramfrom the query and UDF definitions. - Program encoding: Serializes the
LogicalPrograminto Protocol Buffers binary format for compact, schema-versioned storage. - Pipeline ID generation: Generates a unique public pipeline ID (a human-friendly string identifier) for the new pipeline.
- Database transaction: Within a single database transaction:
- Inserts a record into the
pipelinestable with the pipeline name, owner (fromauth_data), and public ID. - Inserts a pipeline version record containing the serialized program, SQL query text, parallelism, and checkpoint interval.
- Records the connection IDs and UDF IDs referenced by the compiled program for dependency tracking.
- Inserts a record into the
- Job initialization: Creates a job record associated with the pipeline, initialized in the
Createdstate. The job's configuration includes the serialized program, parallelism, checkpoint interval, and flags for preview mode and sink enablement. - Return: Returns the public pipeline ID on success, which can be used to query pipeline status, start the job, or update the pipeline.
If any step fails (compilation error, database constraint violation, etc.), the function returns an ErrorResp with appropriate HTTP status and error details, and no partial state is persisted.
Usage
This function is called by the POST /v1/pipelines API endpoint handler and by internal systems that create pipelines programmatically (e.g., pipeline restart logic, pipeline update logic).
Code Reference
Source Location
- Repository
ArroyoSystems/arroyo(GitHub)- File
crates/arroyo-api/src/pipelines.rs- Lines
- L294--L457
Signature
pub(crate) async fn create_pipeline_int(
name: String,
query: String,
udfs: Vec<Udf>,
parallelism: u64,
checkpoint_interval: Duration,
preview: bool,
enable_sinks: bool,
auth_data: AuthData,
db: &DatabaseSource,
) -> Result<String, ErrorResp>
Import
// Crate-internal function; used within arroyo-api
use crate::pipelines::create_pipeline_int;
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
name |
String |
Yes | Human-readable pipeline name, unique within the user's namespace |
query |
String |
Yes | SQL query text defining the streaming pipeline logic |
udfs |
Vec<Udf> |
Yes | User-defined function definitions to compile and register (may be empty) |
parallelism |
u64 |
Yes | Default parallelism level for operators in the dataflow graph |
checkpoint_interval |
Duration |
Yes | Time interval between consistent distributed snapshots |
preview |
bool |
Yes | Whether to run in preview mode (output to console instead of sinks) |
enable_sinks |
bool |
Yes | Whether sink operators should be active (false disables writes to external systems) |
auth_data |
AuthData |
Yes | Authentication context identifying the user and organization |
db |
&DatabaseSource |
Yes | Database connection for persisting pipeline and job records |
Outputs
| Condition | Type | Description |
|---|---|---|
| Success | Result<String> |
The public pipeline ID (e.g., "pl_abc123def456")
|
| Compilation failure | ErrorResp |
HTTP 400 with SQL compilation error details |
| Database error | ErrorResp |
HTTP 500 with internal error details |
| Auth failure | ErrorResp |
HTTP 401/403 with authentication or authorization error |
Usage Examples
Creating a Pipeline via the API Layer
use std::time::Duration;
use arroyo_api::pipelines::create_pipeline_int;
let pipeline_id = create_pipeline_int(
"click_aggregation".to_string(),
"SELECT user_id, count(*) as click_count
FROM clicks
GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE)".to_string(),
vec![], // no UDFs
4, // parallelism
Duration::from_secs(60), // checkpoint every 60 seconds
false, // not preview mode
true, // sinks enabled
auth_data,
&db,
).await?;
// pipeline_id can now be used to start, stop, or query the pipeline
println!("Created pipeline: {}", pipeline_id);
Creating a Preview Pipeline
let preview_id = create_pipeline_int(
"preview_session".to_string(),
"SELECT * FROM pageviews WHERE url LIKE '%/checkout%'".to_string(),
vec![],
1, // single parallelism for preview
Duration::from_secs(30),
true, // preview mode enabled
false, // sinks disabled
auth_data,
&db,
).await?;