Principle:ArroyoSystems Arroyo Pipeline Submission
Summary
Pipeline Submission is the principle of submitting a SQL pipeline for execution through the Arroyo API. This process encompasses query validation, checking for existing pipelines (to support resume semantics), creating new pipelines when necessary, and waiting for the pipeline to reach the Running state. The submission pattern is designed to be idempotent and resume-friendly, enabling seamless restarts after interruption.
Theoretical Basis
Pipeline submission follows a create-or-resume pattern. Rather than always creating a fresh pipeline, the submission logic first checks whether a pipeline with the same SQL query already exists in the system. If one is found, it is resumed; otherwise, a new pipeline is created. This design supports two critical use cases:
- Idempotent restarts: Restarting
arroyo runwith the same state directory and query picks up from the last checkpoint rather than starting from scratch. - Fresh deployments: When no matching pipeline exists, a new one is created with the specified parallelism and name.
Asynchronous Submission
The submission itself is asynchronous. The API call to create or resume a pipeline returns immediately, and the caller must poll for state transitions to determine when the pipeline has reached the Running state. This decouples the submission request from the potentially time-consuming process of scheduling workers, restoring checkpoints, and establishing dataflow connections.
Validation Before Submission
Before any pipeline is created or resumed, the SQL query is validated through the API's validation endpoint. This catches syntax errors, unknown tables, type mismatches, and other issues before any resources are allocated, providing a fast feedback loop for the developer.
Submission Flow
The pipeline submission process follows these steps:
- Connection Wait -- Poll the API server's
/pingendpoint until it becomes available, with a maximum of 50 attempts at 10ms intervals. This handles the startup race where the pipeline submission task starts before the API server is fully ready. - Query Validation -- Send the SQL query to the
validate_queryAPI endpoint. If validation errors are returned, print them and exit immediately. - Existing Pipeline Check -- Retrieve all existing pipelines via paginated
get_pipelinesAPI calls. Search for a pipeline whose query matches the submitted SQL exactly. - Create or Resume -- Based on the check result:
- Match found: Send a
patch_pipelinerequest withStopType::Noneto resume the existing pipeline. This effectively un-stops a previously checkpointed pipeline. - No match, empty database: Create a new pipeline via
create_pipelinewith the specified name and parallelism. - No match, pipelines exist: The state directory belongs to a different pipeline. Either error (default) or warn and continue (if
--forceis specified).
- Match found: Send a
- Handler Registration -- Store the pipeline ID in the shutdown handler so that signals trigger a graceful checkpoint-and-stop for the correct pipeline.
- Wait for Running -- Poll the pipeline's job state until it transitions to "Running", at which point the submission is complete and the pipeline is actively processing data.
State Mismatch Detection
A notable safety feature of the submission process is its detection of state directory mismatches. When the state directory already contains pipelines but none match the submitted query, this indicates either:
- The wrong state directory was specified, or
- The query was modified since the last run.
In either case, proceeding could corrupt existing state. The default behavior is to abort with an error. The --force flag overrides this check for cases where the user intentionally wants to reuse the state directory with a different query.
Relationship to Pipeline Lifecycle
Pipeline submission is the entry point of the pipeline lifecycle in local cluster mode:
| Phase | Action | State Transition |
|---|---|---|
| Submission | create_pipeline or patch_pipeline |
Created/Stopped -> Scheduling |
| Scheduling | Controller assigns workers | Scheduling -> Running |
| Execution | Pipeline processes data | Running (steady state) |
| Shutdown | Signal triggers checkpoint-and-stop | Running -> Stopping -> Stopped |
| Resume | patch_pipeline(StopType::None) |
Stopped -> Scheduling -> Running |