Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:ArroyoSystems Arroyo Create Pipeline Int

From Leeroopedia


Field Value
Sources ArroyoSystems/arroyo
Domains Stream_Processing, Pipeline_Management
Last Updated 2026-02-08

Overview

create_pipeline_int is the internal function responsible for creating a new streaming pipeline in the Arroyo engine. It orchestrates the full creation workflow: compiling the SQL query, encoding the resulting program, persisting the pipeline record with associated metadata, and initializing a job record in the Created state ready for scheduling.

Description

The create_pipeline_int function is the core pipeline creation workhorse called by the public API endpoint handlers. It encapsulates the entire pipeline creation transaction, ensuring that compilation, persistence, and job initialization are performed atomically.

The function proceeds through the following steps:

  1. SQL compilation: Invokes compile_sql to parse, plan, optimize, and generate a LogicalProgram from the query and UDF definitions.
  2. Program encoding: Serializes the LogicalProgram into Protocol Buffers binary format for compact, schema-versioned storage.
  3. Pipeline ID generation: Generates a unique public pipeline ID (a human-friendly string identifier) for the new pipeline.
  4. Database transaction: Within a single database transaction:
    • Inserts a record into the pipelines table with the pipeline name, owner (from auth_data), and public ID.
    • Inserts a pipeline version record containing the serialized program, SQL query text, parallelism, and checkpoint interval.
    • Records the connection IDs and UDF IDs referenced by the compiled program for dependency tracking.
  5. Job initialization: Creates a job record associated with the pipeline, initialized in the Created state. The job's configuration includes the serialized program, parallelism, checkpoint interval, and flags for preview mode and sink enablement.
  6. Return: Returns the public pipeline ID on success, which can be used to query pipeline status, start the job, or update the pipeline.

If any step fails (compilation error, database constraint violation, etc.), the function returns an ErrorResp with appropriate HTTP status and error details, and no partial state is persisted.

Usage

This function is called by the POST /v1/pipelines API endpoint handler and by internal systems that create pipelines programmatically (e.g., pipeline restart logic, pipeline update logic).

Code Reference

Source Location

Repository
ArroyoSystems/arroyo (GitHub)
File
crates/arroyo-api/src/pipelines.rs
Lines
L294--L457

Signature

pub(crate) async fn create_pipeline_int(
    name: String,
    query: String,
    udfs: Vec<Udf>,
    parallelism: u64,
    checkpoint_interval: Duration,
    preview: bool,
    enable_sinks: bool,
    auth_data: AuthData,
    db: &DatabaseSource,
) -> Result<String, ErrorResp>

Import

// Crate-internal function; used within arroyo-api
use crate::pipelines::create_pipeline_int;

I/O Contract

Inputs

Name Type Required Description
name String Yes Human-readable pipeline name, unique within the user's namespace
query String Yes SQL query text defining the streaming pipeline logic
udfs Vec<Udf> Yes User-defined function definitions to compile and register (may be empty)
parallelism u64 Yes Default parallelism level for operators in the dataflow graph
checkpoint_interval Duration Yes Time interval between consistent distributed snapshots
preview bool Yes Whether to run in preview mode (output to console instead of sinks)
enable_sinks bool Yes Whether sink operators should be active (false disables writes to external systems)
auth_data AuthData Yes Authentication context identifying the user and organization
db &DatabaseSource Yes Database connection for persisting pipeline and job records

Outputs

Condition Type Description
Success Result<String> The public pipeline ID (e.g., "pl_abc123def456")
Compilation failure ErrorResp HTTP 400 with SQL compilation error details
Database error ErrorResp HTTP 500 with internal error details
Auth failure ErrorResp HTTP 401/403 with authentication or authorization error

Usage Examples

Creating a Pipeline via the API Layer

use std::time::Duration;
use arroyo_api::pipelines::create_pipeline_int;

let pipeline_id = create_pipeline_int(
    "click_aggregation".to_string(),
    "SELECT user_id, count(*) as click_count
     FROM clicks
     GROUP BY user_id, TUMBLE(event_time, INTERVAL '5' MINUTE)".to_string(),
    vec![],                         // no UDFs
    4,                              // parallelism
    Duration::from_secs(60),        // checkpoint every 60 seconds
    false,                          // not preview mode
    true,                           // sinks enabled
    auth_data,
    &db,
).await?;

// pipeline_id can now be used to start, stop, or query the pipeline
println!("Created pipeline: {}", pipeline_id);

Creating a Preview Pipeline

let preview_id = create_pipeline_int(
    "preview_session".to_string(),
    "SELECT * FROM pageviews WHERE url LIKE '%/checkout%'".to_string(),
    vec![],
    1,                              // single parallelism for preview
    Duration::from_secs(30),
    true,                           // preview mode enabled
    false,                          // sinks disabled
    auth_data,
    &db,
).await?;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment