Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:ArroyoSystems Arroyo Pipeline Submission

From Leeroopedia


Template:Principle

Summary

Pipeline Submission is the principle of submitting a SQL pipeline for execution through the Arroyo API. This process encompasses query validation, checking for existing pipelines (to support resume semantics), creating new pipelines when necessary, and waiting for the pipeline to reach the Running state. The submission pattern is designed to be idempotent and resume-friendly, enabling seamless restarts after interruption.

Theoretical Basis

Pipeline submission follows a create-or-resume pattern. Rather than always creating a fresh pipeline, the submission logic first checks whether a pipeline with the same SQL query already exists in the system. If one is found, it is resumed; otherwise, a new pipeline is created. This design supports two critical use cases:

  • Idempotent restarts: Restarting arroyo run with the same state directory and query picks up from the last checkpoint rather than starting from scratch.
  • Fresh deployments: When no matching pipeline exists, a new one is created with the specified parallelism and name.

Asynchronous Submission

The submission itself is asynchronous. The API call to create or resume a pipeline returns immediately, and the caller must poll for state transitions to determine when the pipeline has reached the Running state. This decouples the submission request from the potentially time-consuming process of scheduling workers, restoring checkpoints, and establishing dataflow connections.

Validation Before Submission

Before any pipeline is created or resumed, the SQL query is validated through the API's validation endpoint. This catches syntax errors, unknown tables, type mismatches, and other issues before any resources are allocated, providing a fast feedback loop for the developer.

Submission Flow

The pipeline submission process follows these steps:

  1. Connection Wait -- Poll the API server's /ping endpoint until it becomes available, with a maximum of 50 attempts at 10ms intervals. This handles the startup race where the pipeline submission task starts before the API server is fully ready.
  2. Query Validation -- Send the SQL query to the validate_query API endpoint. If validation errors are returned, print them and exit immediately.
  3. Existing Pipeline Check -- Retrieve all existing pipelines via paginated get_pipelines API calls. Search for a pipeline whose query matches the submitted SQL exactly.
  4. Create or Resume -- Based on the check result:
    • Match found: Send a patch_pipeline request with StopType::None to resume the existing pipeline. This effectively un-stops a previously checkpointed pipeline.
    • No match, empty database: Create a new pipeline via create_pipeline with the specified name and parallelism.
    • No match, pipelines exist: The state directory belongs to a different pipeline. Either error (default) or warn and continue (if --force is specified).
  5. Handler Registration -- Store the pipeline ID in the shutdown handler so that signals trigger a graceful checkpoint-and-stop for the correct pipeline.
  6. Wait for Running -- Poll the pipeline's job state until it transitions to "Running", at which point the submission is complete and the pipeline is actively processing data.

State Mismatch Detection

A notable safety feature of the submission process is its detection of state directory mismatches. When the state directory already contains pipelines but none match the submitted query, this indicates either:

  • The wrong state directory was specified, or
  • The query was modified since the last run.

In either case, proceeding could corrupt existing state. The default behavior is to abort with an error. The --force flag overrides this check for cases where the user intentionally wants to reuse the state directory with a different query.

Relationship to Pipeline Lifecycle

Pipeline submission is the entry point of the pipeline lifecycle in local cluster mode:

Phase Action State Transition
Submission create_pipeline or patch_pipeline Created/Stopped -> Scheduling
Scheduling Controller assigns workers Scheduling -> Running
Execution Pipeline processes data Running (steady state)
Shutdown Signal triggers checkpoint-and-stop Running -> Stopping -> Stopped
Resume patch_pipeline(StopType::None) Stopped -> Scheduling -> Running

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment