Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:ArroyoSystems Arroyo Jobs Api

From Leeroopedia


Field Value
Sources ArroyoSystems/arroyo
Domains Stream_Processing, Job_Management
Last Updated 2026-02-08

Overview

Jobs_Api implements the REST API endpoints for job lifecycle management in the Arroyo streaming engine, including job creation, listing, error retrieval, checkpoint inspection, checkpoint detail queries, and real-time output streaming via server-sent events (SSE).

Description

The crates/arroyo-api/src/jobs.rs module provides the complete set of job management handlers that power the Arroyo web dashboard's job detail views. It consists of two internal helper functions and five public REST endpoint handlers.

Job Creation

The create_job function is a crate-internal helper that creates a new job record associated with a pipeline. It performs the following steps:

  1. Checkpoint interval validation: Enforces that the checkpoint interval is between 1 second and 1 day. For preview jobs, the checkpoint interval is overridden to 24 hours.
  2. Running job limit check: Queries the database for all active (non-failed, non-finished, non-stopped) jobs in the organization and compares against org_metadata.max_running_jobs.
  3. Job record creation: Generates a unique job ID using generate_id(IdTypes::JobConfig), inserts the job record with pipeline association and checkpoint interval, and creates an initial job status record.
  4. Preview TTL: If the job is a preview, sets a time-to-live of 60 seconds (PREVIEW_TTL).

State Action Resolution

The get_action function determines what UI action is available for a job based on its current state and whether it should be running. It returns a tuple of (action label, optional stop type, in-progress flag). The function handles all 14 job states: Created, Compiling, Scheduling, Running, Rescaling, CheckpointStopping, Recovering, Restarting, Stopping, Stopped, Finishing, Finished, Failed, and Failing.

REST Endpoints

  • GET /v1/jobs (get_jobs) -- Lists all jobs for the authenticated organization.
  • GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/errors (get_job_errors) -- Retrieves paginated error log messages for a specific job, converting database log level enums to API types.
  • GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints (get_job_checkpoints) -- Fetches the checkpoint history for a job, including epoch numbers, timing, and backend information.
  • GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints/{epoch}/operator_checkpoint_groups (get_checkpoint_details) -- Returns per-operator checkpoint details including subtask-level byte counts and event timing spans.
  • GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/output (get_job_output) -- Establishes an SSE stream by subscribing to the controller's gRPC output stream. Validates that the job has a preview sink before connecting. Spawns a background task to forward gRPC messages as SSE events.

Type Conversions

The module includes From<DbLogMessage> for JobLogMessage to convert database log rows into API response types, and TryFrom<DbCheckpoint> for Checkpoint to convert checkpoint database records (including JSON-encoded event spans) into structured API responses.

Usage

These endpoints are called by the Arroyo web UI and programmatic clients to monitor job status, inspect checkpoints, view errors, and stream real-time output during pipeline execution or preview.

Code Reference

Source Location

Repository
ArroyoSystems/arroyo (GitHub)
File
crates/arroyo-api/src/jobs.rs
Lines
L1--L484

Signature

/// Create a new job for a pipeline (crate-internal)
pub(crate) async fn create_job(
    pipeline_name: &str,
    pipeline_id: i64,
    checkpoint_interval: Duration,
    preview: bool,
    auth: &AuthData,
    db: &DatabaseSource,
) -> Result<String, ErrorResp>

/// Determine available action for a job given state and desired running flag
pub(crate) fn get_action(
    state: &str,
    running_desired: &bool,
) -> (String, Option<StopType>, bool)

/// GET /v1/jobs -- List all jobs
pub async fn get_jobs(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
) -> Result<Json<JobCollection>, ErrorResp>

/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/errors
pub async fn get_job_errors(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
    Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>,
    query_params: Query<PaginationQueryParams>,
) -> Result<Json<JobLogMessageCollection>, ErrorResp>

/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints
pub async fn get_job_checkpoints(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
    Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>,
) -> Result<Json<CheckpointCollection>, ErrorResp>

/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/checkpoints/{epoch}/operator_checkpoint_groups
pub async fn get_checkpoint_details(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
    Path((pipeline_pub_id, job_pub_id, epoch)): Path<(String, String, u32)>,
) -> Result<Json<OperatorCheckpointGroupCollection>, ErrorResp>

/// GET /v1/pipelines/{pipeline_id}/jobs/{job_id}/output -- SSE stream
pub async fn get_job_output(
    State(state): State<AppState>,
    bearer_auth: BearerAuth,
    Path((pipeline_pub_id, job_pub_id)): Path<(String, String)>,
) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, ErrorResp>

Import

// Crate-internal: used within arroyo-api
use crate::jobs::{create_job, get_action, get_jobs, get_job_errors,
    get_job_checkpoints, get_checkpoint_details, get_job_output};

I/O Contract

Inputs

Endpoint Parameters Type Description
GET /v1/jobs Bearer token Header Authentication credential
GET .../errors pipeline_id Path (String) Pipeline public ID
GET .../errors job_id Path (String) Job public ID
GET .../errors starting_after Query (Option<String>) Pagination cursor
GET .../errors limit Query (Option<u32>) Page size limit
GET .../checkpoints pipeline_id, job_id Path (String, String) Pipeline and job identifiers
GET .../operator_checkpoint_groups epoch Path (u32) Checkpoint epoch number
GET .../output pipeline_id, job_id Path (String, String) Pipeline and job identifiers

Outputs

Endpoint Type Description
GET /v1/jobs Json<JobCollection> List of all jobs for the organization
GET .../errors Json<JobLogMessageCollection> Paginated error log messages with has_more flag
GET .../checkpoints Json<CheckpointCollection> List of checkpoints with epoch, timing, and backend info
GET .../operator_checkpoint_groups Json<OperatorCheckpointGroupCollection> Per-operator checkpoint detail with subtask byte counts and event spans
GET .../output Sse<Stream> Server-sent event stream of OutputData JSON payloads

Usage Examples

Listing All Jobs

curl -H "Authorization: Bearer $TOKEN" \
  http://localhost:8000/api/v1/jobs

Retrieving Job Errors with Pagination

curl -H "Authorization: Bearer $TOKEN" \
  "http://localhost:8000/api/v1/pipelines/pl_abc123/jobs/jc_def456/errors?limit=50"

Subscribing to Job Output (SSE)

curl -N -H "Authorization: Bearer $TOKEN" \
  http://localhost:8000/api/v1/pipelines/pl_abc123/jobs/jc_def456/output

Using get_action Programmatically

use crate::jobs::get_action;

let (action_label, stop_type, in_progress) = get_action("Running", &true);
// action_label = "Stop", stop_type = Some(Checkpoint), in_progress = false

let (action_label, stop_type, in_progress) = get_action("Failed", &false);
// action_label = "Start", stop_type = Some(None), in_progress = false

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment