Implementation:ArroyoSystems Arroyo Jobs Api
| Field | Value |
|---|---|
| Sources | ArroyoSystems/arroyo |
| Domains | Stream_Processing, Job_Management |
| Last Updated | 2026-02-08 |
Overview
Jobs_Api implements the REST API endpoints for job lifecycle management in the Arroyo streaming engine, including job creation, listing, error retrieval, checkpoint inspection, checkpoint detail queries, and real-time output streaming via server-sent events (SSE).
Description
The crates/arroyo-api/src/jobs.rs module provides the complete set of job management handlers that power the Arroyo web dashboard's job detail views. It consists of two internal helper functions and five public REST endpoint handlers.
Job Creation
The create_job function is a crate-internal helper that creates a new job record associated with a pipeline. It performs the following steps:
- Checkpoint interval validation: Enforces that the checkpoint interval is between 1 second and 1 day. For preview jobs, the checkpoint interval is overridden to 24 hours.
- Running job limit check: Queries the database for all active (non-failed, non-finished, non-stopped) jobs in the organization and compares against
org_metadata.max_running_jobs. - Job record creation: Generates a unique job ID using
generate_id(IdTypes::JobConfig), inserts the job record with pipeline association and checkpoint interval, and creates an initial job status record. - Preview TTL: If the job is a preview, sets a time-to-live of 60 seconds (
PREVIEW_TTL).
State Action Resolution
The get_action function determines what UI action is available for a job based on its current state and whether it should be running. It returns a tuple of (action label, optional stop type, in-progress flag). The function handles all 14 job states: Created, Compiling, Scheduling, Running, Rescaling, CheckpointStopping, Recovering, Restarting, Stopping, Stopped, Finishing, Finished, Failed, and Failing.
REST Endpoints
GET /v1/jobs(get_jobs) -- Lists all jobs for the authenticated organization.GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/errors(get_job_errors) -- Retrieves paginated error log messages for a specific job, converting database log level enums to API types.GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints(get_job_checkpoints) -- Fetches the checkpoint history for a job, including epoch numbers, timing, and backend information.GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints/{epoch}/operator_checkpoint_groups(get_checkpoint_details) -- Returns per-operator checkpoint details including subtask-level byte counts and event timing spans.GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/output(get_job_output) -- Establishes an SSE stream by subscribing to the controller's gRPC output stream. Validates that the job has a preview sink before connecting. Spawns a background task to forward gRPC messages as SSE events.
Type Conversions
The module includes From<DbLogMessage> for JobLogMessage to convert database log rows into API response types, and TryFrom<DbCheckpoint> for Checkpoint to convert checkpoint database records (including JSON-encoded event spans) into structured API responses.
Usage
These endpoints are called by the Arroyo web UI and programmatic clients to monitor job status, inspect checkpoints, view errors, and stream real-time output during pipeline execution or preview.
Code Reference
Source Location
- Repository
ArroyoSystems/arroyo(GitHub)- File
crates/arroyo-api/src/jobs.rs- Lines
- L1--L484
Signature
/// Create a new job for a pipeline (crate-internal)
pub(crate) async fn create_job(
pipeline_name: &str,
pipeline_id: i64,
checkpoint_interval: Duration,
preview: bool,
auth: &AuthData,
db: &DatabaseSource,
) -> Result<String, ErrorResp>
/// Determine available action for a job given state and desired running flag
pub(crate) fn get_action(
state: &str,
running_desired: &bool,
) -> (String, Option<StopType>, bool)
/// GET /v1/jobs -- List all jobs
pub async fn get_jobs(
State(state): State<AppState>,
bearer_auth: BearerAuth,
) -> Result<Json<JobCollection>, ErrorResp>
/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/errors
pub async fn get_job_errors(
State(state): State<AppState>,
bearer_auth: BearerAuth,
Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>,
query_params: Query<PaginationQueryParams>,
) -> Result<Json<JobLogMessageCollection>, ErrorResp>
/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints
pub async fn get_job_checkpoints(
State(state): State<AppState>,
bearer_auth: BearerAuth,
Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>,
) -> Result<Json<CheckpointCollection>, ErrorResp>
/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints/{epoch}/operator_checkpoint_groups
pub async fn get_checkpoint_details(
State(state): State<AppState>,
bearer_auth: BearerAuth,
Path((pipeline_pub_id, job_pub_id, epoch)): Path<(String, String, u32)>,
) -> Result<Json<OperatorCheckpointGroupCollection>, ErrorResp>
/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/output -- SSE stream
pub async fn get_job_output(
State(state): State<AppState>,
bearer_auth: BearerAuth,
Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, ErrorResp>
Import
// Crate-internal: used within arroyo-api
use crate::jobs::{create_job, get_action, get_jobs, get_job_errors,
get_job_checkpoints, get_checkpoint_details, get_job_output};
I/O Contract
Inputs
| Endpoint | Parameters | Type | Description |
|---|---|---|---|
GET /v1/jobs |
Bearer token | Header | Authentication credential |
GET .../errors |
pipeline_id |
Path (String) | Pipeline public ID |
GET .../errors |
job_id |
Path (String) | Job public ID |
GET .../errors |
starting_after |
Query (Option<String>) | Pagination cursor |
GET .../errors |
limit |
Query (Option<u32>) | Page size limit |
GET .../checkpoints |
pipeline_id, job_id |
Path (String, String) | Pipeline and job identifiers |
GET .../operator_checkpoint_groups |
epoch |
Path (u32) | Checkpoint epoch number |
GET .../output |
pipeline_id, job_id |
Path (String, String) | Pipeline and job identifiers |
Outputs
| Endpoint | Type | Description |
|---|---|---|
GET /v1/jobs |
Json<JobCollection> |
List of all jobs for the organization |
GET .../errors |
Json<JobLogMessageCollection> |
Paginated error log messages with has_more flag
|
GET .../checkpoints |
Json<CheckpointCollection> |
List of checkpoints with epoch, timing, and backend info |
GET .../operator_checkpoint_groups |
Json<OperatorCheckpointGroupCollection> |
Per-operator checkpoint detail with subtask byte counts and event spans |
GET .../output |
Sse<Stream> |
Server-sent event stream of OutputData JSON payloads
|
Usage Examples
Listing All Jobs
curl -H "Authorization: Bearer $TOKEN" \
http://localhost:8000/api/v1/jobs
Retrieving Job Errors with Pagination
curl -H "Authorization: Bearer $TOKEN" \
"http://localhost:8000/api/v1/pipelines/pl_abc123/jobs/jc_def456/errors?limit=50"
Subscribing to Job Output (SSE)
curl -N -H "Authorization: Bearer $TOKEN" \
http://localhost:8000/api/v1/pipelines/pl_abc123/jobs/jc_def456/output
Using get_action Programmatically
use crate::jobs::get_action;
let (action_label, stop_type, in_progress) = get_action("Running", &true);
// action_label = "Stop", stop_type = Some(Checkpoint), in_progress = false
let (action_label, stop_type, in_progress) = get_action("Failed", &false);
// action_label = "Start", stop_type = Some(None), in_progress = false